accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject [7/7] accumulo-testing git commit: ACCUMULO-4510 Adding Randomwalk code from Accumulo
Date Tue, 03 Jan 2017 20:55:56 GMT
ACCUMULO-4510 Adding Randomwalk code from Accumulo

* All tests are in core module which creates shaded jar
* Shaded jar can be used on its own or submitted to YARN via Twill


Project: http://git-wip-us.apache.org/repos/asf/accumulo-testing/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-testing/commit/ac5b271c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-testing/tree/ac5b271c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-testing/diff/ac5b271c

Branch: refs/heads/master
Commit: ac5b271caec45083751efad7298852969bec387e
Parents: 89d6acb
Author: Mike Walch <mwalch@apache.org>
Authored: Thu Dec 22 10:39:30 2016 -0500
Committer: Mike Walch <mwalch@apache.org>
Committed: Tue Jan 3 15:50:50 2017 -0500

----------------------------------------------------------------------
 .gitignore                                      |   6 +
 core/.gitignore                                 |   2 +
 core/pom.xml                                    | 129 ++++
 .../testing/core/randomwalk/Environment.java    | 248 ++++++++
 .../testing/core/randomwalk/Fixture.java        |  28 +
 .../testing/core/randomwalk/Framework.java      | 109 ++++
 .../testing/core/randomwalk/Module.java         | 624 +++++++++++++++++++
 .../accumulo/testing/core/randomwalk/Node.java  | 100 +++
 .../accumulo/testing/core/randomwalk/State.java | 129 ++++
 .../accumulo/testing/core/randomwalk/Test.java  |  28 +
 .../core/randomwalk/bulk/BulkImportTest.java    |  85 +++
 .../core/randomwalk/bulk/BulkMinusOne.java      |  35 ++
 .../core/randomwalk/bulk/BulkPlusOne.java       | 117 ++++
 .../testing/core/randomwalk/bulk/BulkTest.java  |  44 ++
 .../testing/core/randomwalk/bulk/Compact.java   |  34 +
 .../core/randomwalk/bulk/ConsistencyCheck.java  |  57 ++
 .../testing/core/randomwalk/bulk/Merge.java     |  61 ++
 .../core/randomwalk/bulk/SelectiveBulkTest.java |  42 ++
 .../core/randomwalk/bulk/SelectiveQueueing.java |  50 ++
 .../testing/core/randomwalk/bulk/Setup.java     |  82 +++
 .../testing/core/randomwalk/bulk/Split.java     |  41 ++
 .../testing/core/randomwalk/bulk/Verify.java    | 148 +++++
 .../core/randomwalk/concurrent/AddSplits.java   |  62 ++
 .../core/randomwalk/concurrent/Apocalypse.java  |  34 +
 .../core/randomwalk/concurrent/BatchScan.java   |  84 +++
 .../core/randomwalk/concurrent/BatchWrite.java  |  82 +++
 .../core/randomwalk/concurrent/BulkImport.java  | 151 +++++
 .../concurrent/ChangeAuthorizations.java        |  63 ++
 .../concurrent/ChangePermissions.java           | 156 +++++
 .../randomwalk/concurrent/CheckPermission.java  |  70 +++
 .../core/randomwalk/concurrent/CloneTable.java  |  66 ++
 .../core/randomwalk/concurrent/Compact.java     |  57 ++
 .../concurrent/ConcurrentFixture.java           |  73 +++
 .../core/randomwalk/concurrent/Config.java      | 235 +++++++
 .../randomwalk/concurrent/CreateNamespace.java  |  49 ++
 .../core/randomwalk/concurrent/CreateTable.java |  61 ++
 .../core/randomwalk/concurrent/CreateUser.java  |  49 ++
 .../randomwalk/concurrent/DeleteNamespace.java  |  52 ++
 .../core/randomwalk/concurrent/DeleteRange.java |  66 ++
 .../core/randomwalk/concurrent/DeleteTable.java |  49 ++
 .../core/randomwalk/concurrent/DropUser.java    |  48 ++
 .../randomwalk/concurrent/IsolatedScan.java     |  74 +++
 .../core/randomwalk/concurrent/ListSplits.java  |  54 ++
 .../core/randomwalk/concurrent/Merge.java       |  59 ++
 .../randomwalk/concurrent/OfflineTable.java     |  56 ++
 .../randomwalk/concurrent/RenameNamespace.java  |  53 ++
 .../core/randomwalk/concurrent/RenameTable.java |  90 +++
 .../core/randomwalk/concurrent/Replication.java | 203 ++++++
 .../core/randomwalk/concurrent/ScanTable.java   |  72 +++
 .../core/randomwalk/concurrent/Setup.java       |  71 +++
 .../core/randomwalk/concurrent/Shutdown.java    |  63 ++
 .../core/randomwalk/concurrent/StartAll.java    |  58 ++
 .../randomwalk/concurrent/StopTabletServer.java |  84 +++
 .../core/randomwalk/conditional/Compact.java    |  48 ++
 .../core/randomwalk/conditional/Flush.java      |  48 ++
 .../core/randomwalk/conditional/Init.java       |  94 +++
 .../core/randomwalk/conditional/Merge.java      |  49 ++
 .../core/randomwalk/conditional/Setup.java      |  60 ++
 .../core/randomwalk/conditional/Split.java      |  45 ++
 .../core/randomwalk/conditional/TearDown.java   |  35 ++
 .../core/randomwalk/conditional/Transfer.java   | 135 ++++
 .../core/randomwalk/conditional/Utils.java      |  35 ++
 .../core/randomwalk/conditional/Verify.java     |  89 +++
 .../testing/core/randomwalk/image/Commit.java   |  35 ++
 .../core/randomwalk/image/ImageFixture.java     | 134 ++++
 .../testing/core/randomwalk/image/ScanMeta.java | 111 ++++
 .../testing/core/randomwalk/image/TableOp.java  |  81 +++
 .../testing/core/randomwalk/image/Verify.java   | 131 ++++
 .../testing/core/randomwalk/image/Write.java    |  97 +++
 .../core/randomwalk/multitable/Commit.java      |  40 ++
 .../core/randomwalk/multitable/CopyTable.java   |  92 +++
 .../core/randomwalk/multitable/CopyTool.java    | 131 ++++
 .../core/randomwalk/multitable/CreateTable.java |  67 ++
 .../core/randomwalk/multitable/DropTable.java   |  51 ++
 .../multitable/MultiTableFixture.java           |  74 +++
 .../randomwalk/multitable/OfflineTable.java     |  47 ++
 .../core/randomwalk/multitable/Write.java       |  89 +++
 .../randomwalk/security/AlterSystemPerm.java    | 101 +++
 .../core/randomwalk/security/AlterTable.java    |  74 +++
 .../randomwalk/security/AlterTablePerm.java     | 180 ++++++
 .../core/randomwalk/security/Authenticate.java  |  82 +++
 .../core/randomwalk/security/ChangePass.java    |  94 +++
 .../core/randomwalk/security/CreateTable.java   |  75 +++
 .../core/randomwalk/security/CreateUser.java    |  70 +++
 .../core/randomwalk/security/DropTable.java     |  87 +++
 .../core/randomwalk/security/DropUser.java      |  68 ++
 .../randomwalk/security/SecurityFixture.java    | 120 ++++
 .../randomwalk/security/SecurityHelper.java     | 215 +++++++
 .../core/randomwalk/security/SetAuths.java      | 100 +++
 .../core/randomwalk/security/TableOp.java       | 257 ++++++++
 .../core/randomwalk/security/Validate.java      | 124 ++++
 .../randomwalk/security/WalkingSecurity.java    | 505 +++++++++++++++
 .../core/randomwalk/sequential/BatchVerify.java | 132 ++++
 .../core/randomwalk/sequential/Commit.java      |  36 ++
 .../randomwalk/sequential/MapRedVerify.java     |  79 +++
 .../randomwalk/sequential/MapRedVerifyTool.java | 156 +++++
 .../sequential/SequentialFixture.java           |  80 +++
 .../core/randomwalk/sequential/Write.java       |  50 ++
 .../core/randomwalk/shard/BulkInsert.java       | 191 ++++++
 .../core/randomwalk/shard/CloneIndex.java       |  45 ++
 .../testing/core/randomwalk/shard/Commit.java   |  33 +
 .../core/randomwalk/shard/CompactFilter.java    |  94 +++
 .../testing/core/randomwalk/shard/Delete.java   |  58 ++
 .../core/randomwalk/shard/DeleteSomeDocs.java   |  80 +++
 .../core/randomwalk/shard/DeleteWord.java       |  97 +++
 .../core/randomwalk/shard/ExportIndex.java      | 118 ++++
 .../testing/core/randomwalk/shard/Flush.java    |  45 ++
 .../testing/core/randomwalk/shard/Grep.java     |  97 +++
 .../testing/core/randomwalk/shard/Insert.java   | 136 ++++
 .../testing/core/randomwalk/shard/Merge.java    |  49 ++
 .../testing/core/randomwalk/shard/Reindex.java  |  66 ++
 .../testing/core/randomwalk/shard/Search.java   | 105 ++++
 .../core/randomwalk/shard/ShardFixture.java     | 137 ++++
 .../testing/core/randomwalk/shard/SortTool.java |  75 +++
 .../testing/core/randomwalk/shard/Split.java    |  41 ++
 .../core/randomwalk/shard/VerifyIndex.java      |  71 +++
 .../core/randomwalk/unit/CreateTable.java       |  30 +
 .../core/randomwalk/unit/DeleteTable.java       |  29 +
 .../testing/core/randomwalk/unit/Ingest.java    |  29 +
 .../testing/core/randomwalk/unit/Scan.java      |  29 +
 .../testing/core/randomwalk/unit/Verify.java    |  29 +
 core/src/main/resources/randomwalk/module.xsd   |  69 ++
 .../main/resources/randomwalk/modules/All.xml   |  65 ++
 .../main/resources/randomwalk/modules/Bulk.xml  |  61 ++
 .../resources/randomwalk/modules/Concurrent.xml | 181 ++++++
 .../randomwalk/modules/Conditional.xml          |  74 +++
 .../main/resources/randomwalk/modules/Image.xml |  70 +++
 .../resources/randomwalk/modules/LongClean.xml  |  60 ++
 .../resources/randomwalk/modules/LongDirty.xml  |  60 ++
 .../resources/randomwalk/modules/LongEach.xml   |  54 ++
 .../resources/randomwalk/modules/MultiTable.xml |  60 ++
 .../resources/randomwalk/modules/Security.xml   | 224 +++++++
 .../resources/randomwalk/modules/Sequential.xml |  51 ++
 .../main/resources/randomwalk/modules/Shard.xml | 123 ++++
 .../resources/randomwalk/modules/ShortClean.xml |  60 ++
 .../resources/randomwalk/modules/ShortDirty.xml |  60 ++
 .../resources/randomwalk/modules/ShortEach.xml  |  54 ++
 .../resources/randomwalk/modules/unit/Basic.xml |  37 ++
 .../randomwalk/modules/unit/Simple.xml          |  43 ++
 .../testing/core/randomwalk/FrameworkTest.java  |  67 ++
 .../randomwalk/ReplicationRandomWalkIT.java     |  66 ++
 core/src/test/resources/log4j.properties        |  21 +
 pom.xml                                         | 172 +++++
 143 files changed, 12562 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f534230
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+/.classpath
+/.project
+/.settings/
+/target/
+/*.iml
+/.idea

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/.gitignore
----------------------------------------------------------------------
diff --git a/core/.gitignore b/core/.gitignore
new file mode 100644
index 0000000..17bb010
--- /dev/null
+++ b/core/.gitignore
@@ -0,0 +1,2 @@
+/target/
+/*.iml

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
new file mode 100644
index 0000000..83998d5
--- /dev/null
+++ b/core/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.accumulo</groupId>
+    <artifactId>accumulo-testing</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>accumulo-testing-core</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Accumulo Testing Core</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-fate</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-master</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-minicluster</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-test</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>create-shade-jar</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>testing-shade-jar</id>
+                <goals>
+                  <goal>shade</goal>
+                </goals>
+                <phase>package</phase>
+                <configuration>
+                  <shadedArtifactAttached>true</shadedArtifactAttached>
+                  <shadedClassifierName>shaded</shadedClassifierName>
+                  <artifactSet>
+                    <excludes>
+                      <exclude>org.apache.accumulo:accumulo-native</exclude>
+                    </excludes>
+                  </artifactSet>
+                  <filters>
+                    <filter>
+                      <artifact>*:*</artifact>
+                      <excludes>
+                        <exclude>META-INF/*.SF</exclude>
+                        <exclude>META-INF/*.DSA</exclude>
+                        <exclude>META-INF/*.RSA</exclude>
+                      </excludes>
+                    </filter>
+                  </filters>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Environment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Environment.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Environment.java
new file mode 100644
index 0000000..5684353
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Environment.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The test environment that is available for randomwalk tests. This includes configuration properties that are available to any randomwalk test and facilities
+ * for creating client-side objects. This class is not thread-safe.
+ */
+public class Environment {
+  /**
+   * The configuration property key for a username.
+   */
+  public static final String KEY_USERNAME = "USERNAME";
+  /**
+   * The configuration property key for a password.
+   */
+  public static final String KEY_PASSWORD = "PASSWORD";
+  /**
+   * The configuration property key for a keytab
+   */
+  public static final String KEY_KEYTAB = "KEYTAB";
+  /**
+   * The configuration property key for the instance name.
+   */
+  public static final String KEY_INSTANCE = "INSTANCE";
+  /**
+   * The configuration property key for the comma-separated list of ZooKeepers.
+   */
+  public static final String KEY_ZOOKEEPERS = "ZOOKEEPERS";
+  /**
+   * The configuration property key for the maximum memory for the multi-table batch writer.
+   */
+  public static final String KEY_MAX_MEM = "MAX_MEM";
+  /**
+   * The configuration property key for the maximum latency, in milliseconds, for the multi-table batch writer.
+   */
+  public static final String KEY_MAX_LATENCY = "MAX_LATENCY";
+  /**
+   * The configuration property key for the number of write threads for the multi-table batch writer.
+   */
+  public static final String KEY_NUM_THREADS = "NUM_THREADS";
+
+  private static final Logger log = LoggerFactory.getLogger(Environment.class);
+
+  private final Properties p;
+  private Instance instance = null;
+  private Connector connector = null;
+  private MultiTableBatchWriter mtbw = null;
+
+  /**
+   * Creates a new test environment.
+   *
+   * @param p
+   *          configuration properties
+   * @throws NullPointerException
+   *           if p is null
+   */
+  public Environment(Properties p) {
+    requireNonNull(p);
+    this.p = p;
+  }
+
+  /**
+   * Gets a copy of the configuration properties.
+   *
+   * @return a copy of the configuration properties
+   */
+  Properties copyConfigProperties() {
+    return new Properties(p);
+  }
+
+  /**
+   * Gets a configuration property.
+   *
+   * @param key
+   *          key
+   * @return property value
+   */
+  public String getConfigProperty(String key) {
+    return p.getProperty(key);
+  }
+
+  /**
+   * Gets the configured username.
+   *
+   * @return username
+   */
+  public String getUserName() {
+    return p.getProperty(KEY_USERNAME);
+  }
+
+  /**
+   * Gets the configured password.
+   *
+   * @return password
+   */
+  public String getPassword() {
+    return p.getProperty(KEY_PASSWORD);
+  }
+
+  /**
+   * Gets the configured keytab.
+   *
+   * @return path to keytab
+   */
+  public String getKeytab() {
+    return p.getProperty(KEY_KEYTAB);
+  }
+
+  /**
+   * Gets this process's ID.
+   *
+   * @return pid
+   */
+  public String getPid() {
+    return ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
+  }
+
+  /**
+   * Gets an authentication token based on the configured password.
+   *
+   * @return authentication token
+   */
+  public AuthenticationToken getToken() {
+    String password = getPassword();
+    if (null != password) {
+      return new PasswordToken(getPassword());
+    }
+    String keytab = getKeytab();
+    if (null != keytab) {
+      File keytabFile = new File(keytab);
+      if (!keytabFile.exists() || !keytabFile.isFile()) {
+        throw new IllegalArgumentException("Provided keytab is not a normal file: " + keytab);
+      }
+      try {
+        UserGroupInformation.loginUserFromKeytab(getUserName(), keytabFile.getAbsolutePath());
+        return new KerberosToken();
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to login", e);
+      }
+    }
+    throw new IllegalArgumentException("Must provide password or keytab in configuration");
+  }
+
+  /**
+   * Gets an Accumulo instance object. The same instance is reused after the first call.
+   *
+   * @return instance
+   */
+  public Instance getInstance() {
+    if (instance == null) {
+      String instance = p.getProperty(KEY_INSTANCE);
+      String zookeepers = p.getProperty(KEY_ZOOKEEPERS);
+      this.instance = new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
+    }
+    return instance;
+  }
+
+  /**
+   * Gets an Accumulo connector. The same connector is reused after the first call.
+   *
+   * @return connector
+   */
+  public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
+    if (connector == null) {
+      connector = getInstance().getConnector(getUserName(), getToken());
+    }
+    return connector;
+  }
+
+  /**
+   * Gets a multitable batch writer. The same object is reused after the first call unless it is reset.
+   *
+   * @return multitable batch writer
+   * @throws NumberFormatException
+   *           if any of the numeric batch writer configuration properties cannot be parsed
+   * @throws NumberFormatException
+   *           if any configuration property cannot be parsed
+   */
+  public MultiTableBatchWriter getMultiTableBatchWriter() throws AccumuloException, AccumuloSecurityException {
+    if (mtbw == null) {
+      long maxMem = Long.parseLong(p.getProperty(KEY_MAX_MEM));
+      long maxLatency = Long.parseLong(p.getProperty(KEY_MAX_LATENCY));
+      int numThreads = Integer.parseInt(p.getProperty(KEY_NUM_THREADS));
+      mtbw = getConnector().createMultiTableBatchWriter(
+          new BatchWriterConfig().setMaxMemory(maxMem).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(numThreads));
+    }
+    return mtbw;
+  }
+
+  /**
+   * Checks if a multitable batch writer has been created by this wrapper.
+   *
+   * @return true if multitable batch writer is already created
+   */
+  public boolean isMultiTableBatchWriterInitialized() {
+    return mtbw != null;
+  }
+
+  /**
+   * Clears the multitable batch writer previously created and remembered by this wrapper.
+   */
+  public void resetMultiTableBatchWriter() {
+    if (mtbw == null)
+      return;
+    if (!mtbw.isClosed()) {
+      log.warn("Setting non-closed MultiTableBatchWriter to null (leaking resources)");
+    }
+    mtbw = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Fixture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Fixture.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Fixture.java
new file mode 100644
index 0000000..5ac280e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Fixture.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk;
+
+import org.apache.log4j.Logger;
+
+public abstract class Fixture {
+
+  protected final Logger log = Logger.getLogger(this.getClass());
+
+  public abstract void setUp(State state, Environment env) throws Exception;
+
+  public abstract void tearDown(State state, Environment env) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Framework.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Framework.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Framework.java
new file mode 100644
index 0000000..1a5700e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Framework.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.Parameter;
+
+public class Framework {
+
+  private static final Logger log = Logger.getLogger(Framework.class);
+  private HashMap<String,Node> nodes = new HashMap<>();
+  private static final Framework INSTANCE = new Framework();
+
+  /**
+   * @return Singleton instance of Framework
+   */
+  public static Framework getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Run random walk framework
+   *
+   * @param startName
+   *          Full name of starting graph or test
+   */
+  public int run(String startName, State state, Environment env) {
+
+    try {
+      Node node = getNode(startName);
+      node.visit(state, env, new Properties());
+    } catch (Exception e) {
+      log.error("Error during random walk", e);
+      return -1;
+    }
+    return 0;
+  }
+
+  /**
+   * Creates node (if it does not already exist) and inserts into map
+   *
+   * @param id
+   *          Name of node
+   * @return Node specified by id
+   */
+  public Node getNode(String id) throws Exception {
+
+    // check for node in nodes
+    if (nodes.containsKey(id)) {
+      return nodes.get(id);
+    }
+
+    // otherwise create and put in nodes
+    Node node = null;
+    if (id.endsWith(".xml")) {
+      node = new Module(new File("/randomwalk/modules/" + id));
+    } else {
+      node = (Test) Class.forName(id).newInstance();
+    }
+    nodes.put(id, node);
+    return node;
+  }
+
+  static class Opts extends org.apache.accumulo.core.cli.Help {
+    @Parameter(names = "--configDir", required = true, description = "directory containing the test configuration")
+    String configDir;
+    @Parameter(names = "--module", required = true, description = "the name of the module to run")
+    String module;
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    if (args.length != 2) {
+      System.out.println("Usage: Framework <propsPath> <module>");
+      System.exit(-1);
+    }
+
+    Properties props = new Properties();
+    FileInputStream fis = new FileInputStream(args[0]);
+    props.load(fis);
+    fis.close();
+
+    State state = new State();
+    Environment env = new Environment(props);
+    int retval = getInstance().run(args[1], state, env);
+
+    System.exit(retval);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Module.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Module.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Module.java
new file mode 100644
index 0000000..1a3d059
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Module.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * A module is directed graph of tests
+ */
+public class Module extends Node {
+
+  private static final Logger log = Logger.getLogger(Module.class);
+
+  private class Dummy extends Node {
+
+    String name;
+
+    Dummy(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public void visit(State state, Environment env, Properties props) {
+      String print;
+      if ((print = props.getProperty("print")) != null) {
+        Level level = Level.toLevel(print);
+        log.log(level, name);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
+
+  private class Alias extends Node {
+
+    Node target;
+    String targetId;
+    String id;
+
+    Alias(String id) {
+      target = null;
+      this.id = id;
+    }
+
+    @Override
+    public void visit(State state, Environment env, Properties props) throws Exception {
+      throw new Exception("You don't visit aliases!");
+    }
+
+    @Override
+    public String toString() {
+      return id;
+    }
+
+    public void update(String node) throws Exception {
+      targetId = node;
+      target = getNode(node);
+    }
+
+    public Node get() {
+      return target;
+    }
+
+    public String getTargetId() {
+      return targetId;
+    }
+  }
+
+  private HashMap<String,Node> nodes = new HashMap<>();
+  private HashMap<String,Properties> localProps = new HashMap<>();
+
+  private class Edge {
+    String nodeId;
+    int weight;
+  }
+
+  private class AdjList {
+
+    private List<Edge> edges = new ArrayList<>();
+    private int totalWeight = 0;
+    private Random rand = new Random();
+
+    /**
+     * Adds a neighbor node and weight of edge
+     */
+    private void addEdge(String nodeId, int weight) {
+
+      totalWeight += weight;
+
+      Edge e = new Edge();
+      e.nodeId = nodeId;
+      e.weight = weight;
+      edges.add(e);
+    }
+
+    /**
+     * Chooses a random neighbor node
+     *
+     * @return Node or null if no edges
+     */
+    private String randomNeighbor() throws Exception {
+
+      String nodeId = null;
+      rand = new Random();
+
+      int randNum = rand.nextInt(totalWeight) + 1;
+      int sum = 0;
+
+      for (Edge e : edges) {
+        nodeId = e.nodeId;
+        sum += e.weight;
+        if (randNum <= sum) {
+          break;
+        }
+      }
+      return nodeId;
+    }
+  }
+
+  private HashMap<String,String> prefixes = new HashMap<>();
+  private HashMap<String,AdjList> adjMap = new HashMap<>();
+  private HashMap<String,Set<String>> aliasMap = new HashMap<>();
+  private final File xmlFile;
+  private String initNodeId;
+  private Fixture fixture = null;
+
+  public Module(File xmlFile) throws Exception {
+    this.xmlFile = xmlFile;
+    loadFromXml();
+  }
+
+  @Override
+  public void visit(final State state, final Environment env, Properties props) throws Exception {
+    int maxHops, maxSec;
+    boolean teardown;
+
+    Properties initProps = getProps("_init");
+    initProps.putAll(props);
+    String prop;
+    if ((prop = initProps.getProperty("maxHops")) == null || prop.equals("0") || prop.equals(""))
+      maxHops = Integer.MAX_VALUE;
+    else
+      maxHops = Integer.parseInt(initProps.getProperty("maxHops", "0"));
+
+    if ((prop = initProps.getProperty("maxSec")) == null || prop.equals("0") || prop.equals(""))
+      maxSec = Integer.MAX_VALUE;
+    else
+      maxSec = Integer.parseInt(initProps.getProperty("maxSec", "0"));
+
+    if ((prop = initProps.getProperty("teardown")) == null || prop.equals("true") || prop.equals(""))
+      teardown = true;
+    else
+      teardown = false;
+
+    if (fixture != null) {
+      fixture.setUp(state, env);
+    }
+
+    ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner");
+
+    try {
+      Node initNode = getNode(initNodeId);
+
+      boolean test = false;
+      if (initNode instanceof Test) {
+        startTimer(initNode);
+        test = true;
+      }
+      initNode.visit(state, env, getProps(initNodeId));
+      if (test)
+        stopTimer(initNode);
+
+      // update aliases
+      Set<String> aliases;
+      if ((aliases = aliasMap.get(initNodeId)) != null)
+        for (String alias : aliases) {
+          ((Alias) nodes.get(alias)).update(initNodeId);
+        }
+
+      String curNodeId = initNodeId;
+      int numHops = 0;
+      long startTime = System.currentTimeMillis() / 1000;
+      while (true) {
+        // check if END state was reached
+        if (curNodeId.equalsIgnoreCase("END")) {
+          log.debug("reached END state");
+          break;
+        }
+        // check if maxSec was reached
+        long curTime = System.currentTimeMillis() / 1000;
+        if ((curTime - startTime) > maxSec) {
+          log.debug("reached maxSec(" + maxSec + ")");
+          break;
+        }
+
+        // The number of seconds before the test should exit
+        long secondsRemaining = maxSec - (curTime - startTime);
+
+        // check if maxHops was reached
+        if (numHops > maxHops) {
+          log.debug("reached maxHops(" + maxHops + ")");
+          break;
+        }
+        numHops++;
+
+        if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
+          throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
+        }
+        AdjList adj = adjMap.get(curNodeId);
+        String nextNodeId = adj.randomNeighbor();
+        final Node nextNode;
+        Node nextNodeOrAlias = getNode(nextNodeId);
+        if (nextNodeOrAlias instanceof Alias) {
+          nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
+          nextNode = ((Alias) nextNodeOrAlias).get();
+        } else {
+          nextNode = nextNodeOrAlias;
+        }
+        final Properties nodeProps = getProps(nextNodeId);
+        try {
+          test = false;
+          if (nextNode instanceof Test) {
+            startTimer(nextNode);
+            test = true;
+          }
+
+          // Wrap the visit of the next node in the module in a callable that returns a thrown exception
+          FutureTask<Exception> task = new FutureTask<>(new Callable<Exception>() {
+
+            @Override
+            public Exception call() throws Exception {
+              try {
+                nextNode.visit(state, env, nodeProps);
+                return null;
+              } catch (Exception e) {
+                return e;
+              }
+            }
+
+          });
+
+          // Run the task (should execute immediately)
+          service.submit(task);
+
+          Exception nodeException;
+          try {
+            // Bound the time we'll wait for the node to complete
+            nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName() + " to complete. Exiting.", e);
+            break;
+          } catch (ExecutionException e) {
+            log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e);
+            throw e;
+          } catch (TimeoutException e) {
+            log.info("Timed out waiting for " + nextNode.getClass().getSimpleName() + " to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
+            break;
+          }
+
+          // The RandomWalk node throw an Exception that that Callable handed back
+          // Throw it and let the Module perform cleanup
+          if (null != nodeException) {
+            throw nodeException;
+          }
+
+          if (test)
+            stopTimer(nextNode);
+        } catch (Exception e) {
+          log.debug("Connector belongs to user: " + env.getConnector().whoami());
+          log.debug("Exception occured at: " + System.currentTimeMillis());
+          log.debug("Properties for node: " + nextNodeId);
+          for (Entry<Object,Object> entry : nodeProps.entrySet()) {
+            log.debug("  " + entry.getKey() + ": " + entry.getValue());
+          }
+          log.debug("Overall Configuration Properties");
+          for (Entry<Object,Object> entry : env.copyConfigProperties().entrySet()) {
+            log.debug("  " + entry.getKey() + ": " + entry.getValue());
+          }
+          log.debug("State information");
+          for (String key : new TreeSet<>(state.getMap().keySet())) {
+            Object value = state.getMap().get(key);
+            String logMsg = "  " + key + ": ";
+            if (value == null)
+              logMsg += "null";
+            else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number)
+              logMsg += value;
+            else if (value instanceof byte[])
+              logMsg += new String((byte[]) value, UTF_8);
+            else if (value instanceof PasswordToken)
+              logMsg += new String(((PasswordToken) value).getPassword(), UTF_8);
+            else
+              logMsg += value.getClass() + " - " + value;
+
+            log.debug(logMsg);
+          }
+          throw new Exception("Error running node " + nextNodeId, e);
+        }
+
+        // update aliases
+        if ((aliases = aliasMap.get(curNodeId)) != null)
+          for (String alias : aliases) {
+            ((Alias) nodes.get(alias)).update(curNodeId);
+          }
+
+        curNodeId = nextNodeId;
+      }
+    } finally {
+      if (null != service) {
+        service.shutdownNow();
+      }
+    }
+
+    if (teardown && (fixture != null)) {
+      log.debug("tearing down module");
+      fixture.tearDown(state, env);
+    }
+  }
+
+  Thread timer = null;
+  final int time = 5 * 1000 * 60;
+  AtomicBoolean runningLong = new AtomicBoolean(false);
+  long systemTime;
+
+  /**
+   *
+   */
+  private void startTimer(final Node initNode) {
+    runningLong.set(false);
+    timer = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          systemTime = System.currentTimeMillis();
+          Thread.sleep(time);
+        } catch (InterruptedException ie) {
+          return;
+        }
+        long timeSinceLastProgress = System.currentTimeMillis() - initNode.lastProgress();
+        if (timeSinceLastProgress > time) {
+          log.warn("Node " + initNode + " has been running for " + timeSinceLastProgress / 1000.0 + " seconds. You may want to look into it.");
+          runningLong.set(true);
+        }
+      }
+    });
+    initNode.makingProgress();
+    timer.start();
+  }
+
+  /**
+   *
+   */
+  private void stopTimer(Node nextNode) {
+    synchronized (timer) {
+      timer.interrupt();
+      try {
+        timer.join();
+      } catch (InterruptedException e) {
+        log.error("Failed to join timer '" + timer.getName() + "'.", e);
+      }
+    }
+    if (runningLong.get())
+      log.warn("Node " + nextNode + ", which was running long, has now completed after " + (System.currentTimeMillis() - systemTime) / 1000.0 + " seconds");
+  }
+
+  @Override
+  public String toString() {
+    return xmlFile.toString();
+  }
+
+  private String getFullName(String name) {
+
+    int index = name.indexOf(".");
+    if (index == -1 || name.endsWith(".xml")) {
+      return name;
+    }
+
+    String id = name.substring(0, index);
+
+    if (!prefixes.containsKey(id)) {
+      log.warn("Id (" + id + ") was not found in prefixes");
+      return name;
+    }
+
+    return prefixes.get(id).concat(name.substring(index + 1));
+  }
+
+  private Node createNode(String id, String src) throws Exception {
+
+    // check if id indicates dummy node
+    if (id.equalsIgnoreCase("END") || id.startsWith("dummy")) {
+      if (nodes.containsKey(id) == false) {
+        nodes.put(id, new Dummy(id));
+      }
+      return nodes.get(id);
+    }
+
+    if (id.startsWith("alias")) {
+      if (nodes.containsKey(id) == false) {
+        nodes.put(id, new Alias(id));
+      }
+      return nodes.get(id);
+    }
+
+    // grab node from framework based on its id or src
+    Node node;
+    if (src == null || src.isEmpty()) {
+      node = Framework.getInstance().getNode(getFullName(id));
+    } else {
+      node = Framework.getInstance().getNode(getFullName(src));
+    }
+
+    // add to node to this module's map
+    nodes.put(id, node);
+
+    return node;
+  }
+
+  private Node getNode(String id) throws Exception {
+
+    if (nodes.containsKey(id)) {
+      return nodes.get(id);
+    }
+
+    if (id.equalsIgnoreCase("END")) {
+      nodes.put(id, new Dummy(id));
+      return nodes.get(id);
+    }
+
+    return Framework.getInstance().getNode(getFullName(id));
+  }
+
+  private Properties getProps(String nodeId) {
+    if (localProps.containsKey(nodeId)) {
+      return localProps.get(nodeId);
+    }
+    return new Properties();
+  }
+
+  private void loadFromXml() throws Exception {
+    DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+    DocumentBuilder docbuilder;
+    Document d = null;
+
+    // set the schema
+    SchemaFactory sf = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+    Schema moduleSchema = sf.newSchema(this.getClass().getClassLoader().getResource("randomwalk/module.xsd"));
+    dbf.setSchema(moduleSchema);
+
+    // parse the document
+    try {
+      docbuilder = dbf.newDocumentBuilder();
+      d = docbuilder.parse(xmlFile);
+    } catch (Exception e) {
+      log.error("Failed to parse: " + xmlFile, e);
+      throw new Exception("Failed to parse: " + xmlFile);
+    }
+
+    // parse packages
+    NodeList nodelist = d.getDocumentElement().getElementsByTagName("package");
+    for (int i = 0; i < nodelist.getLength(); i++) {
+      Element el = (Element) nodelist.item(i);
+      String value = el.getAttribute("value");
+      if (!value.endsWith(".")) {
+        value = value.concat(".");
+      }
+      prefixes.put(el.getAttribute("prefix"), value);
+    }
+
+    // parse fixture node
+    nodelist = d.getDocumentElement().getElementsByTagName("fixture");
+    if (nodelist.getLength() > 0) {
+      Element fixtureEl = (Element) nodelist.item(0);
+      fixture = (Fixture) Class.forName(getFullName(fixtureEl.getAttribute("id"))).newInstance();
+    }
+
+    // parse initial node
+    Element initEl = (Element) d.getDocumentElement().getElementsByTagName("init").item(0);
+    initNodeId = initEl.getAttribute("id");
+    Properties initProps = new Properties();
+    String attr = initEl.getAttribute("maxHops");
+
+    if (attr != null)
+      initProps.setProperty("maxHops", attr);
+    attr = initEl.getAttribute("maxSec");
+
+    if (attr != null)
+      initProps.setProperty("maxSec", attr);
+    attr = initEl.getAttribute("teardown");
+
+    if (attr != null)
+      initProps.setProperty("teardown", attr);
+    localProps.put("_init", initProps);
+
+    // parse all nodes
+    nodelist = d.getDocumentElement().getElementsByTagName("node");
+    for (int i = 0; i < nodelist.getLength(); i++) {
+
+      Element nodeEl = (Element) nodelist.item(i);
+
+      // get attributes
+      String id = nodeEl.getAttribute("id");
+      if (adjMap.containsKey(id)) {
+        throw new Exception("Module already contains: " + id);
+      }
+      String src = nodeEl.getAttribute("src");
+
+      // create node
+      createNode(id, src);
+
+      // set some attributes in properties for later use
+      Properties props = new Properties();
+      props.setProperty("maxHops", nodeEl.getAttribute("maxHops"));
+      props.setProperty("maxSec", nodeEl.getAttribute("maxSec"));
+      props.setProperty("teardown", nodeEl.getAttribute("teardown"));
+
+      // parse aliases
+      NodeList aliaslist = nodeEl.getElementsByTagName("alias");
+      Set<String> aliases = new TreeSet<>();
+      for (int j = 0; j < aliaslist.getLength(); j++) {
+        Element propEl = (Element) aliaslist.item(j);
+
+        if (!propEl.hasAttribute("name")) {
+          throw new Exception("Node " + id + " has alias with no identifying name");
+        }
+
+        String key = "alias." + propEl.getAttribute("name");
+
+        aliases.add(key);
+        createNode(key, null);
+      }
+      if (aliases.size() > 0)
+        aliasMap.put(id, aliases);
+
+      // parse properties of nodes
+      NodeList proplist = nodeEl.getElementsByTagName("property");
+      for (int j = 0; j < proplist.getLength(); j++) {
+        Element propEl = (Element) proplist.item(j);
+
+        if (!propEl.hasAttribute("key") || !propEl.hasAttribute("value")) {
+          throw new Exception("Node " + id + " has property with no key or value");
+        }
+
+        String key = propEl.getAttribute("key");
+
+        if (key.equals("maxHops") || key.equals("maxSec") || key.equals("teardown")) {
+          throw new Exception("The following property can only be set in attributes: " + key);
+        }
+
+        props.setProperty(key, propEl.getAttribute("value"));
+      }
+      localProps.put(id, props);
+
+      // parse edges of nodes
+      AdjList edges = new AdjList();
+      adjMap.put(id, edges);
+      NodeList edgelist = nodeEl.getElementsByTagName("edge");
+      if (edgelist.getLength() == 0) {
+        throw new Exception("Node " + id + " has no edges!");
+      }
+      for (int j = 0; j < edgelist.getLength(); j++) {
+        Element edgeEl = (Element) edgelist.item(j);
+
+        String edgeID = edgeEl.getAttribute("id");
+
+        if (!edgeEl.hasAttribute("weight")) {
+          throw new Exception("Edge with id=" + edgeID + " is missing weight");
+        }
+
+        int weight = Integer.parseInt(edgeEl.getAttribute("weight"));
+        edges.addEdge(edgeID, weight);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Node.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Node.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Node.java
new file mode 100644
index 0000000..b2c2f97
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Node.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk;
+
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Represents a point in graph of RandomFramework
+ */
+public abstract class Node {
+
+  protected final Logger log = Logger.getLogger(this.getClass());
+  long progress = System.currentTimeMillis();
+
+  /**
+   * Visits node
+   *
+   * @param state
+   *          Random walk state passed between nodes
+   * @param env
+   *          test environment
+   */
+  public abstract void visit(State state, Environment env, Properties props) throws Exception;
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null)
+      return false;
+    return toString().equals(o.toString());
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getName();
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode();
+  }
+
+  synchronized public void makingProgress() {
+    progress = System.currentTimeMillis();
+  }
+
+  synchronized public long lastProgress() {
+    return progress;
+  }
+
+  protected String getMapReduceJars() {
+
+    String acuHome = System.getenv("ACCUMULO_HOME");
+    String zkHome = System.getenv("ZOOKEEPER_HOME");
+
+    if (acuHome == null || zkHome == null) {
+      throw new RuntimeException("ACCUMULO or ZOOKEEPER home not set!");
+    }
+
+    String retval = null;
+
+    File zkLib = new File(zkHome);
+    String[] files = zkLib.list();
+    if (files != null) {
+      for (int i = 0; i < files.length; i++) {
+        String f = files[i];
+        if (f.matches("^zookeeper-.+jar$")) {
+          if (retval == null) {
+            retval = String.format("%s/%s", zkLib.getAbsolutePath(), f);
+          } else {
+            retval += String.format(",%s/%s", zkLib.getAbsolutePath(), f);
+          }
+        }
+      }
+    }
+
+    File libdir = new File(acuHome + "/lib");
+    for (String jar : "accumulo-core accumulo-server-base accumulo-fate accumulo-trace commons-math3 libthrift htrace-core".split(" ")) {
+      retval += String.format(",%s/%s.jar", libdir.getAbsolutePath(), jar);
+    }
+
+    return retval;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/State.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/State.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/State.java
new file mode 100644
index 0000000..b619674
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/State.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk;
+
+import java.util.HashMap;
+
+/**
+ * A structure for storing state kept during a test. This class is not thread-safe.
+ */
+public class State {
+
+  private HashMap<String,Object> stateMap = new HashMap<>();
+
+  /**
+   * Creates new empty state.
+   */
+  public State() {}
+
+  /**
+   * Sets a state object.
+   *
+   * @param key
+   *          key for object
+   * @param value
+   *          object
+   */
+  public void set(String key, Object value) {
+    stateMap.put(key, value);
+  }
+
+  /**
+   * Removes a state object.
+   *
+   * @param key
+   *          key for object
+   */
+  public void remove(String key) {
+    stateMap.remove(key);
+  }
+
+  /**
+   * Gets a state object.
+   *
+   * @param key
+   *          key for object
+   * @return value object
+   * @throws RuntimeException
+   *           if state object is not present
+   */
+  public Object get(String key) {
+    if (stateMap.containsKey(key) == false) {
+      throw new RuntimeException("State does not contain " + key);
+    }
+    return stateMap.get(key);
+  }
+
+  /**
+   * Gets a state object, returning null if it is absent.
+   *
+   * @param key
+   *          key for object
+   * @return value object, or null if not present
+   */
+  public Object getOkIfAbsent(String key) {
+    return stateMap.get(key);
+  }
+
+  /**
+   * Gets the map of state objects. The backing map for state is returned, so changes to it affect the state.
+   *
+   * @return state map
+   */
+  HashMap<String,Object> getMap() {
+    return stateMap;
+  }
+
+  /**
+   * Gets a state object as a string.
+   *
+   * @param key
+   *          key for object
+   * @return value as string
+   * @throws ClassCastException
+   *           if the value object is not a string
+   */
+  public String getString(String key) {
+    return (String) stateMap.get(key);
+  }
+
+  /**
+   * Gets a state object as an integer.
+   *
+   * @param key
+   *          key for object
+   * @return value as integer
+   * @throws ClassCastException
+   *           if the value object is not an integer
+   */
+  public Integer getInteger(String key) {
+    return (Integer) stateMap.get(key);
+  }
+
+  /**
+   * Gets a state object as a long.
+   *
+   * @param key
+   *          key for object
+   * @return value as long
+   * @throws ClassCastException
+   *           if the value object is not a long
+   */
+  public Long getLong(String key) {
+    return (Long) stateMap.get(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Test.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Test.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Test.java
new file mode 100644
index 0000000..a8e117a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/Test.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk;
+
+/**
+ * Tests are extended by users to perform actions on accumulo and are a node of the graph
+ */
+public abstract class Test extends Node {
+
+  @Override
+  public String toString() {
+    return getClass().getName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkImportTest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkImportTest.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkImportTest.java
new file mode 100644
index 0000000..317a294
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkImportTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+
+/**
+ * If we have a sufficient back-up of imports, let them work off before adding even more bulk-imports. Imports of PlusOne must always be balanced with imports
+ * of MinusOne.
+ */
+public abstract class BulkImportTest extends BulkTest {
+
+  public static final String SKIPPED_IMPORT = "skipped.import", TRUE = Boolean.TRUE.toString(), FALSE = Boolean.FALSE.toString();
+
+  @Override
+  public void visit(final State state, Environment env, Properties props) throws Exception {
+    /**
+     * Each visit() is performed sequentially and then submitted to the threadpool which will have async execution. As long as we're checking the state and
+     * making decisions about what to do before we submit something to the thread pool, we're fine.
+     */
+
+    String lastImportSkipped = state.getString(SKIPPED_IMPORT);
+    // We have a marker in the state for the previous insert, we have to balance skipping BulkPlusOne
+    // with skipping the new BulkMinusOne to make sure that we maintain consistency
+    if (null != lastImportSkipped) {
+      if (!getClass().equals(BulkMinusOne.class)) {
+        throw new IllegalStateException("Should not have a skipped import marker for a class other than " + BulkMinusOne.class.getName() + " but was "
+            + getClass().getName());
+      }
+
+      if (TRUE.equals(lastImportSkipped)) {
+        log.debug("Last import was skipped, skipping this import to ensure consistency");
+        state.remove(SKIPPED_IMPORT);
+
+        // Wait 30s to balance the skip of a BulkPlusOne/BulkMinusOne pair
+        log.debug("Waiting 30s before continuing");
+        try {
+          Thread.sleep(30 * 1000);
+        } catch (InterruptedException e) {}
+
+        return;
+      } else {
+        // last import was not skipped, remove the marker
+        state.remove(SKIPPED_IMPORT);
+      }
+    }
+
+    if (shouldQueueMoreImports(state, env)) {
+      super.visit(state, env, props);
+    } else {
+      log.debug("Not queuing more imports this round because too many are already queued");
+      state.set(SKIPPED_IMPORT, TRUE);
+      // Don't sleep here, let the sleep happen when we skip the next BulkMinusOne
+    }
+  }
+
+  private boolean shouldQueueMoreImports(State state, Environment env) throws Exception {
+    // Only selectively import when it's BulkPlusOne. If we did a BulkPlusOne,
+    // we must also do a BulkMinusOne to keep the table consistent
+    if (getClass().equals(BulkPlusOne.class)) {
+      // Only queue up more imports if the number of queued tasks already
+      // exceeds the number of tservers by 50x
+      return SelectiveQueueing.shouldQueueOperation(state, env);
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkMinusOne.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkMinusOne.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkMinusOne.java
new file mode 100644
index 0000000..a9bb8f9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkMinusOne.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+
+public class BulkMinusOne extends BulkImportTest {
+
+  private static final Value negOne = new Value("-1".getBytes(UTF_8));
+
+  @Override
+  protected void runLater(State state, Environment env) throws Exception {
+    log.info("Decrementing");
+    BulkPlusOne.bulkLoadLots(log, state, env, negOne);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkPlusOne.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkPlusOne.java
new file mode 100644
index 0000000..239e93e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkPlusOne.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.IteratorSetting.Column;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class BulkPlusOne extends BulkImportTest {
+
+  public static final int LOTS = 100000;
+  public static final int COLS = 10;
+  public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) / Math.log(16));
+  public static final String FMT = "r%0" + HEX_SIZE + "x";
+  public static final List<Column> COLNAMES = new ArrayList<>();
+  public static final Text CHECK_COLUMN_FAMILY = new Text("cf");
+  static {
+    for (int i = 0; i < COLS; i++) {
+      COLNAMES.add(new Column(CHECK_COLUMN_FAMILY, new Text(String.format("%03d", i))));
+    }
+  }
+  public static final Text MARKER_CF = new Text("marker");
+  static final AtomicLong counter = new AtomicLong();
+
+  private static final Value ONE = new Value("1".getBytes());
+
+  static void bulkLoadLots(Logger log, State state, Environment env, Value value) throws Exception {
+    final Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
+    final Path fail = new Path(dir.toString() + "_fail");
+    final DefaultConfiguration defaultConfiguration = AccumuloConfiguration.getDefaultConfiguration();
+    final Random rand = (Random) state.get("rand");
+    final FileSystem fs = (FileSystem) state.get("fs");
+    fs.mkdirs(fail);
+    final int parts = rand.nextInt(10) + 1;
+
+    TreeSet<Integer> startRows = new TreeSet<>();
+    startRows.add(0);
+    while (startRows.size() < parts)
+      startRows.add(rand.nextInt(LOTS));
+
+    List<String> printRows = new ArrayList<>(startRows.size());
+    for (Integer row : startRows)
+      printRows.add(String.format(FMT, row));
+
+    String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
+    log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
+
+    List<Integer> rows = new ArrayList<>(startRows);
+    rows.add(LOTS);
+
+    for (int i = 0; i < parts; i++) {
+      String fileName = dir + "/" + String.format("part_%d.", i) + RFile.EXTENSION;
+      FileSKVWriter f = FileOperations.getInstance().newWriterBuilder().forFile(fileName, fs, fs.getConf()).withTableConfiguration(defaultConfiguration)
+          .build();
+      f.startDefaultLocalityGroup();
+      int start = rows.get(i);
+      int end = rows.get(i + 1);
+      for (int j = start; j < end; j++) {
+        Text row = new Text(String.format(FMT, j));
+        for (Column col : COLNAMES) {
+          f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value);
+        }
+        f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
+      }
+      f.close();
+    }
+    env.getConnector().tableOperations().importDirectory(Setup.getTableName(), dir.toString(), fail.toString(), true);
+    fs.delete(dir, true);
+    FileStatus[] failures = fs.listStatus(fail);
+    if (failures != null && failures.length > 0) {
+      state.set("bulkImportSuccess", "false");
+      throw new Exception(failures.length + " failure files found importing files from " + dir);
+    }
+    fs.delete(fail, true);
+    log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
+  }
+
+  @Override
+  protected void runLater(State state, Environment env) throws Exception {
+    log.info("Incrementing");
+    bulkLoadLots(log, state, env, ONE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkTest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkTest.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkTest.java
new file mode 100644
index 0000000..dc11501
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/BulkTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public abstract class BulkTest extends Test {
+
+  @Override
+  public void visit(final State state, final Environment env, Properties props) throws Exception {
+    Setup.run(state, new Runnable() {
+      @Override
+      public void run() {
+        try {
+          runLater(state, env);
+        } catch (Throwable ex) {
+          log.error(ex, ex);
+        }
+      }
+
+    });
+  }
+
+  abstract protected void runLater(State state, Environment env) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Compact.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Compact.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Compact.java
new file mode 100644
index 0000000..356a7c9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Compact.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.hadoop.io.Text;
+
+public class Compact extends SelectiveBulkTest {
+
+  @Override
+  protected void runLater(State state, Environment env) throws Exception {
+    final Text[] points = Merge.getRandomTabletRange(state);
+    final String rangeString = Merge.rangeToString(points);
+    log.info("Compacting " + rangeString);
+    env.getConnector().tableOperations().compact(Setup.getTableName(), points[0], points[1], false, true);
+    log.info("Compaction " + rangeString + " finished");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/ConsistencyCheck.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/ConsistencyCheck.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/ConsistencyCheck.java
new file mode 100644
index 0000000..eb21f30
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/ConsistencyCheck.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.hadoop.io.Text;
+
+public class ConsistencyCheck extends SelectiveBulkTest {
+
+  @Override
+  protected void runLater(State state, Environment env) throws Exception {
+    Random rand = (Random) state.get("rand");
+    Text row = Merge.getRandomRow(rand);
+    log.info("Checking " + row);
+    String user = env.getConnector().whoami();
+    Authorizations auths = env.getConnector().securityOperations().getUserAuthorizations(user);
+    try (Scanner scanner = new IsolatedScanner(env.getConnector().createScanner(Setup.getTableName(), auths))) {
+      scanner.setRange(new Range(row));
+      scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
+      Value v = null;
+      Key first = null;
+      for (Entry<Key,Value> entry : scanner) {
+        if (v == null) {
+          v = entry.getValue();
+          first = entry.getKey();
+        }
+        if (!v.equals(entry.getValue()))
+          throw new RuntimeException("Inconsistent value at " + entry.getKey() + " was " + entry.getValue() + " should be " + v + " first read at " + first);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Merge.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Merge.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Merge.java
new file mode 100644
index 0000000..ebce171
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Merge.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.hadoop.io.Text;
+
+public class Merge extends SelectiveBulkTest {
+
+  @Override
+  protected void runLater(State state, Environment env) throws Exception {
+    Text[] points = getRandomTabletRange(state);
+    log.info("merging " + rangeToString(points));
+    env.getConnector().tableOperations().merge(Setup.getTableName(), points[0], points[1]);
+    log.info("merging " + rangeToString(points) + " complete");
+  }
+
+  public static String rangeToString(Text[] points) {
+    return "(" + (points[0] == null ? "-inf" : points[0]) + " -> " + (points[1] == null ? "+inf" : points[1]) + "]";
+  }
+
+  public static Text getRandomRow(Random rand) {
+    return new Text(String.format(BulkPlusOne.FMT, (rand.nextLong() & 0x7fffffffffffffffl) % BulkPlusOne.LOTS));
+  }
+
+  public static Text[] getRandomTabletRange(State state) {
+    Random rand = (Random) state.get("rand");
+    Text points[] = {getRandomRow(rand), getRandomRow(rand),};
+    Arrays.sort(points);
+    if (rand.nextInt(10) == 0) {
+      points[0] = null;
+    }
+    if (rand.nextInt(10) == 0) {
+      points[1] = null;
+    }
+    if (rand.nextInt(20) == 0) {
+      points[0] = null;
+      points[1] = null;
+    }
+    return points;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/SelectiveBulkTest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/SelectiveBulkTest.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/SelectiveBulkTest.java
new file mode 100644
index 0000000..a708942
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/SelectiveBulkTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+
+/**
+ * Selectively runs the actual {@link BulkTest} based on the number of active TServers and the number of queued operations.
+ */
+public abstract class SelectiveBulkTest extends BulkTest {
+
+  @Override
+  public void visit(State state, Environment env, Properties props) throws Exception {
+    if (SelectiveQueueing.shouldQueueOperation(state, env)) {
+      super.visit(state, env, props);
+    } else {
+      log.debug("Skipping queueing of " + getClass().getSimpleName() + " because of excessive queued tasks already");
+      log.debug("Waiting 30 seconds before continuing");
+      try {
+        Thread.sleep(30 * 1000);
+      } catch (InterruptedException e) {}
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/SelectiveQueueing.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/SelectiveQueueing.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/SelectiveQueueing.java
new file mode 100644
index 0000000..59cf8aa
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/SelectiveQueueing.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Chooses whether or not an operation should be queued based on the current thread pool queue length and the number of available TServers.
+ */
+public class SelectiveQueueing {
+  private static final Logger log = LoggerFactory.getLogger(SelectiveQueueing.class);
+
+  public static boolean shouldQueueOperation(State state, Environment env) throws Exception {
+    final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool");
+    long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount();
+    final Connector conn = env.getConnector();
+    int numTservers = conn.instanceOperations().getTabletServers().size();
+
+    if (!shouldQueue(queuedThreads, numTservers)) {
+      log.info("Not queueing because of " + queuedThreads + " outstanding tasks");
+      return false;
+    }
+
+    return true;
+  }
+
+  private static boolean shouldQueue(long queuedThreads, int numTservers) {
+    return queuedThreads < numTservers * 50;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Setup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Setup.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Setup.java
new file mode 100644
index 0000000..f3c3fdf
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/bulk/Setup.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.bulk;
+
+import java.net.InetAddress;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.iterators.LongCombiner;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.fs.FileSystem;
+
+public class Setup extends Test {
+
+  private static final int MAX_POOL_SIZE = 8;
+  static String tableName = null;
+
+  @Override
+  public void visit(State state, Environment env, Properties props) throws Exception {
+    Random rand = new Random();
+    String hostname = InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_");
+    String pid = env.getPid();
+    tableName = String.format("bulk_%s_%s_%d", hostname, pid, System.currentTimeMillis());
+    log.info("Starting bulk test on " + tableName);
+
+    TableOperations tableOps = env.getConnector().tableOperations();
+    try {
+      if (!tableOps.exists(getTableName())) {
+        tableOps.create(getTableName());
+        IteratorSetting is = new IteratorSetting(10, SummingCombiner.class);
+        SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
+        SummingCombiner.setCombineAllColumns(is, true);
+        tableOps.attachIterator(getTableName(), is);
+      }
+    } catch (TableExistsException ex) {
+      // expected if there are multiple walkers
+    }
+    state.set("rand", rand);
+    state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
+    state.set("bulkImportSuccess", "true");
+    BulkPlusOne.counter.set(0l);
+
+    ThreadPoolExecutor e = new SimpleThreadPool(MAX_POOL_SIZE, "bulkImportPool");
+    state.set("pool", e);
+  }
+
+  public static String getTableName() {
+    return tableName;
+  }
+
+  public static ThreadPoolExecutor getThreadPool(State state) {
+    return (ThreadPoolExecutor) state.get("pool");
+  }
+
+  public static void run(State state, Runnable r) {
+    getThreadPool(state).submit(r);
+  }
+
+}


Mime
View raw message