kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [2/5] kudu git commit: KUDU-2411: (Part 1) Break out existing test utilities into a seperate module
Date Wed, 17 Oct 2018 20:43:06 GMT
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
new file mode 100644
index 0000000..9b67a9b
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.AsyncKuduScanner;
+import org.apache.kudu.client.AsyncKuduSession;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.apache.kudu.util.DecimalUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Utilities useful for cluster testing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class ClientTestUtil {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ClientTestUtil.class);
+
+  public static final Callback<Object, Object> defaultErrorCB = new Callback<Object, Object>() {
+    @Override
+    public Object call(Object arg) throws Exception {
+      if (arg == null) {
+        return null;
+      }
+      if (arg instanceof Exception) {
+        LOG.warn("Got exception", (Exception) arg);
+      } else {
+        LOG.warn("Got an error response back {}", arg);
+      }
+      return new Exception("cannot recover from error: " + arg);
+    }
+  };
+
+  /**
+   * Counts the rows from the {@code scanner} until exhaustion. It doesn't require the scanner to
+   * be new, so it can be used to finish scanning a previously-started scan.
+   */
+  public static int countRowsInScan(AsyncKuduScanner scanner, long timeoutMs) throws Exception {
+    final AtomicInteger counter = new AtomicInteger();
+
+    Callback<Object, RowResultIterator> cb = new Callback<Object, RowResultIterator>() {
+      @Override
+      public Object call(RowResultIterator arg) throws Exception {
+        if (arg == null) return null;
+        counter.addAndGet(arg.getNumRows());
+        return null;
+      }
+    };
+
+    while (scanner.hasMoreRows()) {
+      Deferred<RowResultIterator> data = scanner.nextRows();
+      data.addCallbacks(cb, defaultErrorCB);
+      data.join(timeoutMs);
+    }
+    return counter.get();
+  }
+
+  /**
+   * Same as {@link #countRowsInScan(AsyncKuduScanner, long)}, but defaults the timeout to 60
+   * seconds.
+   */
+  public static int countRowsInScan(AsyncKuduScanner scanner) throws Exception {
+    return countRowsInScan(scanner, 60000);
+  }
+
+  public static int countRowsInScan(KuduScanner scanner) throws KuduException {
+    int counter = 0;
+    while (scanner.hasMoreRows()) {
+      counter += scanner.nextRows().getNumRows();
+    }
+    return counter;
+  }
+
+  /**
+   * Scans the table and returns the number of rows.
+   * @param table the table
+   * @param predicates optional predicates to apply to the scan
+   * @return the number of rows in the table matching the predicates
+   */
+  public static long countRowsInTable(KuduTable table, KuduPredicate... predicates) throws KuduException {
+    KuduScanner.KuduScannerBuilder scanBuilder =
+        table.getAsyncClient().syncClient().newScannerBuilder(table);
+    for (KuduPredicate predicate : predicates) {
+      scanBuilder.addPredicate(predicate);
+    }
+    scanBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+    return countRowsInScan(scanBuilder.build());
+  }
+
+  /**
+   * Counts the rows in the provided scan tokens.
+   */
+  public static int countScanTokenRows(List<KuduScanToken> tokens, final String masterAddresses,
+                                       final long operationTimeoutMs)
+      throws IOException, InterruptedException {
+    final AtomicInteger count = new AtomicInteger(0);
+    List<Thread> threads = new ArrayList<>();
+    for (final KuduScanToken token : tokens) {
+      final byte[] serializedToken = token.serialize();
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
+                   .defaultAdminOperationTimeoutMs(operationTimeoutMs)
+                   .build()) {
+            KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
+            try {
+              int localCount = 0;
+              while (scanner.hasMoreRows()) {
+                localCount += Iterators.size(scanner.nextRows());
+              }
+              count.addAndGet(localCount);
+            } finally {
+              scanner.close();
+            }
+          } catch (Exception e) {
+            LOG.error("exception in parallel token scanner", e);
+          }
+        }
+      });
+      thread.run();
+      threads.add(thread);
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    return count.get();
+  }
+
+  public static List<String> scanTableToStrings(KuduTable table,
+                                                KuduPredicate... predicates) throws Exception {
+    List<String> rowStrings = Lists.newArrayList();
+    KuduScanner.KuduScannerBuilder scanBuilder =
+        table.getAsyncClient().syncClient().newScannerBuilder(table);
+    for (KuduPredicate predicate : predicates) {
+      scanBuilder.addPredicate(predicate);
+    }
+    KuduScanner scanner = scanBuilder.build();
+    while (scanner.hasMoreRows()) {
+      RowResultIterator rows = scanner.nextRows();
+      for (RowResult r : rows) {
+        rowStrings.add(r.rowToString());
+      }
+    }
+    Collections.sort(rowStrings);
+    return rowStrings;
+  }
+
+  public static Schema getSchemaWithAllTypes() {
+    List<ColumnSchema> columns =
+        ImmutableList.of(
+            new ColumnSchema.ColumnSchemaBuilder("int8", Type.INT8).key(true).build(),
+            new ColumnSchema.ColumnSchemaBuilder("int16", Type.INT16).build(),
+            new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build(),
+            new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build(),
+            new ColumnSchema.ColumnSchemaBuilder("bool", Type.BOOL).build(),
+            new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build(),
+            new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build(),
+            new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build(),
+            new ColumnSchema.ColumnSchemaBuilder("binary-array", Type.BINARY).build(),
+            new ColumnSchema.ColumnSchemaBuilder("binary-bytebuffer", Type.BINARY).build(),
+            new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(),
+            new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).build(),
+            new ColumnSchema.ColumnSchemaBuilder("decimal", Type.DECIMAL)
+                .typeAttributes(DecimalUtil.typeAttributes(5, 3)).build());
+
+    return new Schema(columns);
+  }
+
+  public static CreateTableOptions getAllTypesCreateTableOptions() {
+    return new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("int8"));
+  }
+
+  public static Schema getBasicSchema() {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(5);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column1_i", Type.INT32).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column2_i", Type.INT32).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column3_s", Type.STRING)
+                    .nullable(true)
+                    .desiredBlockSize(4096)
+                    .encoding(ColumnSchema.Encoding.DICT_ENCODING)
+                    .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.LZ4)
+                    .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column4_b", Type.BOOL).build());
+    return new Schema(columns);
+  }
+
+  public static CreateTableOptions getBasicCreateTableOptions() {
+    return new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"));
+  }
+
+  /**
+   * Creates table options with non-covering range partitioning for a table with
+   * the basic schema. Range partition key ranges fall between the following values:
+   *
+   * [  0,  50)
+   * [ 50, 100)
+   * [200, 300)
+   */
+  public static CreateTableOptions getBasicTableOptionsWithNonCoveredRange() {
+    Schema schema = getBasicSchema();
+    CreateTableOptions option = new CreateTableOptions();
+    option.setRangePartitionColumns(ImmutableList.of("key"));
+
+    PartialRow aLowerBound = schema.newPartialRow();
+    aLowerBound.addInt("key", 0);
+    PartialRow aUpperBound = schema.newPartialRow();
+    aUpperBound.addInt("key", 100);
+    option.addRangePartition(aLowerBound, aUpperBound);
+
+    PartialRow bLowerBound = schema.newPartialRow();
+    bLowerBound.addInt("key", 200);
+    PartialRow bUpperBound = schema.newPartialRow();
+    bUpperBound.addInt("key", 300);
+    option.addRangePartition(bLowerBound, bUpperBound);
+
+    PartialRow split = schema.newPartialRow();
+    split.addInt("key", 50);
+    option.addSplitRow(split);
+    return option;
+  }
+
+  /**
+   * A generic helper function to create a table with default test options.
+   */
+  public static KuduTable createDefaultTable(KuduClient client, String tableName) throws KuduException {
+    return client.createTable(tableName, getBasicSchema(), getBasicCreateTableOptions());
+  }
+
+  /**
+   * Load a table of default schema with the specified number of records, in ascending key order.
+   */
+  public static void loadDefaultTable(KuduClient client, String tableName, int numRows)
+      throws KuduException {
+    KuduTable table = client.openTable(tableName);
+    KuduSession session = client.newSession();
+    for (int i = 0; i < numRows; i++) {
+      Insert insert = createBasicSchemaInsert(table, i);
+      session.apply(insert);
+    }
+    session.flush();
+    session.close();
+  }
+
+  public static Insert createBasicSchemaInsert(KuduTable table, int key) {
+    Insert insert = table.newInsert();
+    PartialRow row = insert.getRow();
+    row.addInt(0, key);
+    row.addInt(1, 2);
+    row.addInt(2, 3);
+    row.addString(3, "a string");
+    row.addBoolean(4, true);
+    return insert;
+  }
+
+  public static KuduTable createFourTabletsTableWithNineRows(AsyncKuduClient client,
+                                                             String tableName,
+                                                             final long timeoutMs)
+      throws Exception {
+    final int[] KEYS = new int[] { 10, 20, 30 };
+    final Schema basicSchema = getBasicSchema();
+    CreateTableOptions builder = getBasicCreateTableOptions();
+    for (int i : KEYS) {
+      PartialRow splitRow = basicSchema.newPartialRow();
+      splitRow.addInt(0, i);
+      builder.addSplitRow(splitRow);
+    }
+    KuduTable table = client.syncClient().createTable(tableName, basicSchema, builder);
+    AsyncKuduSession session = client.newSession();
+
+    // create a table with on empty tablet and 3 tablets of 3 rows each
+    for (int key1 : KEYS) {
+      for (int key2 = 1; key2 <= 3; key2++) {
+        Insert insert = table.newInsert();
+        PartialRow row = insert.getRow();
+        row.addInt(0, key1 + key2);
+        row.addInt(1, key1);
+        row.addInt(2, key2);
+        row.addString(3, "a string");
+        row.addBoolean(4, true);
+        session.apply(insert).join(timeoutMs);
+      }
+    }
+    session.close().join(timeoutMs);
+    return table;
+  }
+
+  public static Schema createManyStringsSchema() {
+    ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.STRING).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.STRING).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c3", Type.STRING).nullable(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c4", Type.STRING).nullable(true).build());
+    return new Schema(columns);
+  }
+
+  public static Schema createSchemaWithBinaryColumns() {
+    ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.BINARY).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.STRING).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.DOUBLE).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c3", Type.BINARY).nullable(true).build());
+    return new Schema(columns);
+  }
+
+  public static Schema createSchemaWithTimestampColumns() {
+    ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.UNIXTIME_MICROS).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.UNIXTIME_MICROS).nullable(true).build());
+    return new Schema(columns);
+  }
+
+  public static Schema createSchemaWithDecimalColumns() {
+    ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.DECIMAL).key(true)
+        .typeAttributes(
+            new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+                .precision(DecimalUtil.MAX_DECIMAL64_PRECISION).build()
+        ).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.DECIMAL).nullable(true)
+        .typeAttributes(
+            new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+                .precision(DecimalUtil.MAX_DECIMAL128_PRECISION).build()
+        ).build());
+    return new Schema(columns);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
new file mode 100644
index 0000000..172989e
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -0,0 +1,444 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test;
+
+import com.google.common.base.Stopwatch;
+import com.stumbleupon.async.Deferred;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.AsyncKuduClient.AsyncKuduClientBuilder;
+import org.apache.kudu.client.DeadlineTracker;
+import org.apache.kudu.client.HostAndPort;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.RemoteTablet;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
+import org.apache.kudu.test.cluster.FakeDNS;
+import org.apache.kudu.test.junit.RetryRule;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A Junit Rule that manages a Kudu cluster and clients for testing.
+ * This rule also includes utility methods for the cluster
+ * and clients.
+ *
+ * <pre>
+ * public static class TestFoo {
+ *
+ *  &#064;Rule
+ *  public KuduTestHarness harness = new KuduTestHarness();
+ *
+ *  ...
+ * }
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class KuduTestHarness extends ExternalResource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KuduTestHarness.class);
+
+  private static final int NUM_MASTER_SERVERS = 3;
+  private static final int NUM_TABLET_SERVERS = 3;
+
+  // Default timeout/sleep interval for various client operations,
+  // waiting for various jobs/threads to complete, etc.
+  public static final int DEFAULT_SLEEP = 50000;
+
+  private final Random randomForTSRestart = RandomUtils.getRandom();
+
+  private MiniKuduClusterBuilder clusterBuilder;
+  private MiniKuduCluster miniCluster;
+
+  // We create both versions of the asyncClient for ease of use.
+  private AsyncKuduClient asyncClient;
+  private KuduClient client;
+
+  public KuduTestHarness(final MiniKuduClusterBuilder clusterBuilder) {
+    this.clusterBuilder = clusterBuilder;
+  }
+
+  public KuduTestHarness() {
+    this.clusterBuilder = getBaseClusterBuilder();
+  }
+
+  /**
+   * Returns the base MiniKuduClusterBuilder used when creating a
+   * KuduTestHarness with the default constructor. This is useful
+   * if you want to add to the default cluster setup.
+   */
+  public static MiniKuduClusterBuilder getBaseClusterBuilder() {
+    return new MiniKuduClusterBuilder()
+        .numMasterServers(NUM_MASTER_SERVERS)
+        .numTabletServers(NUM_TABLET_SERVERS);
+  }
+
+  @Override
+  public Statement apply(Statement base, Description description) {
+    // Set any master server flags defined in the method level annotation.
+    MasterServerConfig masterServerConfig = description.getAnnotation(MasterServerConfig.class);
+    if (masterServerConfig != null) {
+      for (String flag : masterServerConfig.flags()) {
+        clusterBuilder.addMasterServerFlag(flag);
+      }
+    }
+    // Set any tablet server flags defined in the method level annotation.
+    TabletServerConfig tabletServerConfig = description.getAnnotation(TabletServerConfig.class);
+    if (tabletServerConfig != null) {
+      for (String flag : tabletServerConfig.flags()) {
+        clusterBuilder.addTabletServerFlag(flag);
+      }
+    }
+
+    // Generate the ExternalResource Statement.
+    Statement statement = super.apply(base, description);
+    // Wrap in the RetryRule to rerun flaky tests.
+    return new RetryRule().apply(statement, description);
+  }
+
+  @Override
+  public void before() throws Exception {
+    FakeDNS.getInstance().install();
+    LOG.info("Creating a new MiniKuduCluster...");
+    miniCluster = clusterBuilder.build();
+    LOG.info("Creating a new Kudu client...");
+    asyncClient = new AsyncKuduClientBuilder(miniCluster.getMasterAddressesAsString())
+        .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+        .build();
+    client = asyncClient.syncClient();
+  }
+
+  @Override
+  public void after() {
+    try {
+      if (client != null) {
+        client.shutdown();
+        // No need to explicitly shutdown the async client,
+        // shutting down the sync client effectively does that.
+      }
+    } catch (KuduException e) {
+      LOG.warn("Error while shutting down the test client");
+    } finally {
+      if (miniCluster != null) {
+        miniCluster.shutdown();
+      }
+    }
+  }
+
+  public KuduClient getClient() {
+    return client;
+  }
+
+  public AsyncKuduClient getAsyncClient() {
+    return asyncClient;
+  }
+
+  /**
+   * Helper method to easily kill a tablet server that serves the given table's only tablet's
+   * leader. The currently running test case will be failed if there's more than one tablet,
+   * if the tablet has no leader after some retries, or if the tablet server was already killed.
+   *
+   * This method is thread-safe.
+   * @param table a KuduTable which will get its single tablet's leader killed.
+   * @throws Exception
+   */
+  public void killTabletLeader(KuduTable table) throws Exception {
+    List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
+    if (tablets.isEmpty() || tablets.size() > 1) {
+      fail("Currently only support killing leaders for tables containing 1 tablet, table " +
+          table.getName() + " has " + tablets.size());
+    }
+    LocatedTablet tablet = tablets.get(0);
+    if (tablet.getReplicas().size() == 1) {
+      fail("Table " + table.getName() + " only has 1 tablet, please enable replication");
+    }
+
+    HostAndPort hp = findLeaderTabletServer(tablet);
+    miniCluster.killTabletServer(hp);
+  }
+
+  /**
+   * Helper method to kill a tablet server that serves the given tablet's
+   * leader. The currently running test case will be failed if the tablet has no
+   * leader after some retries, or if the tablet server was already killed.
+   *
+   * This method is thread-safe.
+   * @param tablet a RemoteTablet which will get its leader killed
+   * @throws Exception
+   */
+  public void killTabletLeader(RemoteTablet tablet) throws Exception {
+    killTabletLeader(new LocatedTablet(tablet));
+  }
+
+  /**
+   * Helper method to kill a tablet server that serves the given tablet's
+   * leader. The currently running test case will be failed if the tablet has no
+   * leader after some retries, or if the tablet server was already killed.
+   *
+   * This method is thread-safe.
+   * @param tablet a LocatedTablet which will get its leader killed
+   * @throws Exception
+   */
+  public void killTabletLeader(LocatedTablet tablet) throws Exception {
+    HostAndPort hp = findLeaderTabletServer(tablet);
+    miniCluster.killTabletServer(hp);
+  }
+
+  /**
+   * Finds the RPC port of the given tablet's leader tserver.
+   * @param tablet a LocatedTablet
+   * @return the host and port of the given tablet's leader tserver
+   * @throws Exception if we are unable to find the leader tserver
+   */
+  public HostAndPort findLeaderTabletServer(LocatedTablet tablet)
+      throws Exception {
+    LocatedTablet.Replica leader = null;
+    DeadlineTracker deadlineTracker = new DeadlineTracker();
+    deadlineTracker.setDeadline(DEFAULT_SLEEP);
+    while (leader == null) {
+      if (deadlineTracker.timedOut()) {
+        fail("Timed out while trying to find a leader for this table");
+      }
+
+      leader = tablet.getLeaderReplica();
+      if (leader == null) {
+        LOG.info("Sleeping while waiting for a tablet LEADER to arise, currently slept {} ms",
+            deadlineTracker.getElapsedMillis());
+        Thread.sleep(50);
+      }
+    }
+    return new HostAndPort(leader.getRpcHost(), leader.getRpcPort());
+  }
+
+  /**
+   * Helper method to easily kill the leader master.
+   *
+   * This method is thread-safe.
+   * @throws Exception if there is an error finding or killing the leader master.
+   */
+  public void killLeaderMasterServer() throws Exception {
+    HostAndPort hp = findLeaderMasterServer();
+    miniCluster.killMasterServer(hp);
+  }
+
+  /**
+   * Find the host and port of the leader master.
+   * @return the host and port of the leader master
+   * @throws Exception if we are unable to find the leader master
+   */
+  public HostAndPort findLeaderMasterServer() throws Exception {
+    Stopwatch sw = Stopwatch.createStarted();
+    while (sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) {
+      Deferred<Master.GetTableLocationsResponsePB> masterLocD =
+          asyncClient.getMasterTableLocationsPB(null);
+      Master.GetTableLocationsResponsePB r = masterLocD.join(DEFAULT_SLEEP);
+      Common.HostPortPB pb = r.getTabletLocations(0)
+          .getReplicas(0)
+          .getTsInfo()
+          .getRpcAddresses(0);
+      if (pb.getPort() != -1) {
+        return new HostAndPort(pb.getHost(), pb.getPort());
+      }
+    }
+    throw new IOException(String.format("No leader master found after %d ms", DEFAULT_SLEEP));
+  }
+
+  /**
+   * Picks at random a tablet server that serves tablets from the passed table and restarts it.
+   * @param table table to query for a TS to restart
+   * @throws Exception
+   */
+  public void restartTabletServer(KuduTable table) throws Exception {
+    List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
+    if (tablets.isEmpty()) {
+      fail("Table " + table.getName() + " doesn't have any tablets");
+    }
+
+    LocatedTablet tablet = tablets.get(0);
+    LocatedTablet.Replica replica =
+        tablet.getReplicas().get(randomForTSRestart.nextInt(tablet.getReplicas().size()));
+    HostAndPort hp = new HostAndPort(replica.getRpcHost(), replica.getRpcPort());
+    miniCluster.killTabletServer(hp);
+    miniCluster.startTabletServer(hp);
+  }
+
+  /**
+   * Kills a tablet server that serves the given tablet's leader and restarts it.
+   * @param tablet a RemoteTablet which will get its leader killed and restarted
+   * @throws Exception
+   */
+  public void restartTabletServer(RemoteTablet tablet) throws Exception {
+    HostAndPort hp = findLeaderTabletServer(new LocatedTablet(tablet));
+    miniCluster.killTabletServer(hp);
+    miniCluster.startTabletServer(hp);
+  }
+
+  /**
+   * Kills and restarts the leader master.
+   * @throws Exception
+   */
+  public void restartLeaderMaster() throws Exception {
+    HostAndPort hp = findLeaderMasterServer();
+    miniCluster.killMasterServer(hp);
+    miniCluster.startMasterServer(hp);
+  }
+
+  /**
+   * Return the comma-separated list of "host:port" pairs that describes the master
+   * config for this cluster.
+   * @return The master config string.
+   */
+  public String getMasterAddressesAsString() {
+    return miniCluster.getMasterAddressesAsString();
+  }
+
+  /**
+   * @return the list of master servers
+   */
+  public List<HostAndPort> getMasterServers() {
+    return miniCluster.getMasterServers();
+  }
+
+  /**
+   * @return the list of tablet servers
+   */
+  public List<HostAndPort> getTabletServers() {
+    return miniCluster.getMasterServers();
+  }
+
+  /**
+   * @return path to the mini cluster root directory
+   */
+  public String getClusterRoot() {
+    return miniCluster.getClusterRoot();
+  }
+
+  /**
+   * Kills all the master servers.
+   * Does nothing to the servers that are already dead.
+   *
+   * @throws IOException
+   */
+  public void killAllMasterServers() throws IOException {
+    miniCluster.killAllMasterServers();
+  }
+
+  /**
+   * Starts all the master servers.
+   * Does nothing to the servers that are already running.
+   *
+   * @throws IOException
+   */
+  public void startAllMasterServers() throws IOException {
+    miniCluster.startAllMasterServers();
+  }
+
+  /**
+   * Kills all the tablet servers.
+   * Does nothing to the servers that are already dead.
+   *
+   * @throws IOException
+   */
+  public void killAllTabletServers() throws IOException {
+    miniCluster.killAllTabletServers();
+  }
+
+  /**
+   * Starts all the tablet servers.
+   * Does nothing to the servers that are already running.
+   *
+   * @throws IOException
+   */
+  public void startAllTabletServers() throws IOException {
+    miniCluster.startAllTabletServers();
+  }
+
+  /**
+   * Removes all credentials for all principals from the Kerberos credential cache.
+   */
+  public void kdestroy() throws IOException {
+    miniCluster.kdestroy();
+  }
+
+  /**
+   * Re-initialize Kerberos credentials for the given username, writing them
+   * into the Kerberos credential cache.
+   * @param username the username to kinit as
+   */
+  public void kinit(String username) throws IOException {
+    miniCluster.kinit(username);
+  }
+
+  /**
+   * Resets the clients so that their state is completely fresh, including meta
+   * cache, connections, open tables, sessions and scanners, and propagated timestamp.
+   */
+  public void resetClients() throws IOException {
+    client.shutdown();
+    asyncClient = new AsyncKuduClientBuilder(miniCluster.getMasterAddressesAsString())
+        .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+        .build();
+    client = asyncClient.syncClient();
+  }
+
+  /**
+   * An annotation that can be added to each test method to
+   * define additional master server flags to be used when
+   * creating the test cluster.
+   *
+   * ex: @MasterServerConfig(flags = { "key1=valA", "key2=valB" })
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD})
+  public @interface MasterServerConfig {
+    String[] flags();
+  }
+
+  /**
+   * An annotation that can be added to each test method to
+   * define additional tablet server flags to be used when
+   * creating the test cluster.
+   *
+   * ex: @TabletServerConfig(flags = { "key1=valA", "key2=valB" })
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD})
+  public @interface TabletServerConfig {
+    String[] flags();
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ProtobufUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ProtobufUtils.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ProtobufUtils.java
new file mode 100644
index 0000000..5be17c2
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ProtobufUtils.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test;
+
+import com.google.protobuf.ByteString;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.Common;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ProtobufUtils {
+
+  /**
+   * Get a PartitionPB with empty start and end keys.
+   * @return a fake partition
+   */
+  public static Common.PartitionPB.Builder getFakePartitionPB() {
+    Common.PartitionPB.Builder partition = Common.PartitionPB.newBuilder();
+    partition.setPartitionKeyStart(ByteString.EMPTY);
+    partition.setPartitionKeyEnd(ByteString.EMPTY);
+    return partition;
+  }
+
+  /**
+   * Create a ReplicaPB based on the passed information.
+   * @param uuid server's identifier
+   * @param host server's hostname
+   * @param port server's port
+   * @param role server's role in the configuration
+   * @return a fake ReplicaPB
+   */
+  public static Master.TabletLocationsPB.ReplicaPB.Builder getFakeTabletReplicaPB(
+      String uuid, String host, int port, Metadata.RaftPeerPB.Role role) {
+    Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
+    Common.HostPortPB.Builder hostBuilder = Common.HostPortPB.newBuilder();
+    hostBuilder.setHost(host);
+    hostBuilder.setPort(port);
+    tsInfoBuilder.addRpcAddresses(hostBuilder);
+    tsInfoBuilder.setPermanentUuid(ByteString.copyFromUtf8(uuid));
+    Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
+        Master.TabletLocationsPB.ReplicaPB.newBuilder();
+    replicaBuilder.setTsInfo(tsInfoBuilder);
+    replicaBuilder.setRole(role);
+    return replicaBuilder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/RandomUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/RandomUtils.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/RandomUtils.java
new file mode 100644
index 0000000..0328e15
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/RandomUtils.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RandomUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(RandomUtils.class);
+
+  private static final String TEST_RANDOM_SEED_PROP = "testRandomSeed";
+
+  /**
+   * Get an instance of Random for use in tests and logs the seed used.
+   *
+   * Uses a default seed of System.currentTimeMillis() with the option to
+   * override via the testRandomSeed system property.
+   */
+  public static Random getRandom() {
+    // First check the system property.
+    long seed = System.currentTimeMillis();
+    if (System.getProperty(TEST_RANDOM_SEED_PROP) != null) {
+      seed = Long.parseLong(System.getProperty(TEST_RANDOM_SEED_PROP));
+      LOG.info("System property {} is defined. Overriding random seed.", TEST_RANDOM_SEED_PROP, seed);
+    }
+    LOG.info("Using random seed: {}", seed);
+    return new Random(seed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/FakeDNS.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/FakeDNS.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/FakeDNS.java
new file mode 100644
index 0000000..ca0fcbd
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/FakeDNS.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.test.cluster;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Throwables;
+import com.google.common.net.InetAddresses;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Fake DNS resolver which allows our tests to work well even though we use
+ * strange loopback IP addresses (127.x.y.z) with no corresponding reverse
+ * DNS.
+ *
+ * This overrides the reverse lookups for such IPs to return the same address
+ * in String form.
+ *
+ * Without this class, reverse DNS lookups for such addresses often take
+ * 5 seconds to return, causing timeouts and overall test slowness.
+ *
+ * In the future this class might also be extended to test more interesting
+ * DNS-related scenarios.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FakeDNS {
+  static FakeDNS instance = new FakeDNS();
+
+  @GuardedBy("this")
+  private Map<String, InetAddress> forwardResolutions = new HashMap<>();
+
+  @GuardedBy("this")
+  private Map<InetAddress, String> reverseResolutions = new HashMap<>();
+
+  /** whether the fake resolver has been installed */
+  @GuardedBy("this")
+  private boolean installed = false;
+
+  private FakeDNS() {}
+  public static FakeDNS getInstance() {
+    return instance;
+  }
+
+  public synchronized void addForwardResolution(String hostname, InetAddress ip) {
+    forwardResolutions.put(hostname, ip);
+  }
+
+  public synchronized void addReverseResolution(InetAddress ip, String hostname) {
+    reverseResolutions.put(ip, hostname);
+  }
+
+  /**
+   * Install the fake DNS resolver into the Java runtime.
+   */
+  public synchronized void install() {
+    if (installed) return;
+    try {
+      try {
+        // Override the NameService in Java 9 or later.
+        final Class<?> nameServiceInterface = Class.forName("java.net.InetAddress$NameService");
+        Field field = InetAddress.class.getDeclaredField("nameService");
+        // Get the default NameService to fallback to.
+        Method method = InetAddress.class.getDeclaredMethod("createNameService");
+        method.setAccessible(true);
+        Object fallbackNameService = method.invoke(null);
+        // Create a proxy instance to set on the InetAddress field which will handle
+        // all NameService calls.
+        Object proxy = Proxy.newProxyInstance(nameServiceInterface.getClassLoader(),
+            new Class<?>[]{nameServiceInterface}, new NameServiceListener(fallbackNameService));
+        field.setAccessible(true);
+        field.set(InetAddress.class, proxy);
+      } catch (final ClassNotFoundException | NoSuchFieldException e) {
+        // Override the NameService in Java 8 or earlier.
+        final Class<?> nameServiceInterface = Class.forName("sun.net.spi.nameservice.NameService");
+        Field field = InetAddress.class.getDeclaredField("nameServices");
+        // Get the default NameService to fallback to.
+        Method method = InetAddress.class.getDeclaredMethod("createNSProvider", String.class);
+        method.setAccessible(true);
+        Object fallbackNameService = method.invoke(null, "default");
+        // Create a proxy instance to set on the InetAddress field which will handle
+        // all NameService calls.
+        Object proxy = Proxy.newProxyInstance(nameServiceInterface.getClassLoader(),
+            new Class<?>[]{nameServiceInterface}, new NameServiceListener(fallbackNameService));
+        field.setAccessible(true);
+        // Java 8 or earlier takes a list of NameServices
+        field.set(InetAddress.class, Arrays.asList(proxy));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    installed = true;
+  }
+
+  /**
+   * The NameService in all versions of Java has the same interface, so we
+   * can use the same InvocationHandler as our proxy instance for both
+   * java.net.InetAddress$NameService and sun.net.spi.nameservice.NameService.
+   */
+  private class NameServiceListener implements InvocationHandler {
+
+    private final Object fallbackNameService;
+
+    // Creates a NameServiceListener with a NameService implementation to
+    // fallback to. The parameter is untyped so we can handle the NameService
+    // type in all versions of Java with reflection.
+    NameServiceListener(Object fallbackNameService) {
+      this.fallbackNameService = fallbackNameService;
+    }
+
+    private InetAddress[] lookupAllHostAddr(String host) throws UnknownHostException {
+      InetAddress inetAddress;
+      synchronized(FakeDNS.this) {
+        inetAddress = forwardResolutions.get(host);
+      }
+      if (inetAddress != null) {
+        return new InetAddress[]{inetAddress};
+      }
+
+      try {
+        Method method = fallbackNameService.getClass()
+            .getDeclaredMethod("lookupAllHostAddr", String.class);
+        method.setAccessible(true);
+        return (InetAddress[]) method.invoke(fallbackNameService, host);
+      } catch (ReflectiveOperationException e) {
+        Throwables.propagateIfPossible(e.getCause(), UnknownHostException.class);
+        throw new AssertionError("unexpected reflection issue", e);
+      }
+    }
+
+    private String getHostByAddr(byte[] addr) throws UnknownHostException {
+      if (addr[0] == 127) {
+        return InetAddresses.toAddrString(InetAddress.getByAddress(addr));
+      }
+
+      String hostname;
+      synchronized (FakeDNS.this) {
+        hostname = reverseResolutions.get(InetAddress.getByAddress(addr));
+      }
+      if (hostname != null) {
+        return hostname;
+      }
+
+      try {
+        Method method = fallbackNameService.getClass()
+            .getDeclaredMethod("getHostByAddr", byte[].class);
+        method.setAccessible(true);
+        return (String) method.invoke(fallbackNameService, (Object) addr);
+      } catch (ReflectiveOperationException e) {
+        Throwables.propagateIfPossible(e.getCause(), UnknownHostException.class);
+        throw new AssertionError("unexpected reflection issue", e);
+      }
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      switch (method.getName()) {
+        case "lookupAllHostAddr":
+          return lookupAllHostAddr((String) args[0]);
+        case "getHostByAddr":
+          return getHostByAddr((byte[]) args[0]);
+        default:
+          throw new UnsupportedOperationException();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/KuduBinaryLocator.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/KuduBinaryLocator.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/KuduBinaryLocator.java
new file mode 100644
index 0000000..9c5597f
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/KuduBinaryLocator.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test.cluster;
+
+import com.google.common.io.CharStreams;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class KuduBinaryLocator {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduBinaryLocator.class);
+
+  private static final String KUDU_BIN_DIR_PROP = "kuduBinDir";
+
+  /**
+   * Find the binary directory within the build tree.
+   *
+   * Uses the following priority:
+   *    - If kuduBinDir system property is set, use that.
+   *    - If the `kudu` binary is found on the PATH using `which kudu`,
+   *      use its parent directory.
+   */
+  private static String findBinaryDir() {
+    // If kuduBinDir system property is set, use that.
+    String kuduBinDirProp = System.getProperty(KUDU_BIN_DIR_PROP);
+    if (kuduBinDirProp != null) {
+      LOG.info("Using Kudu binary directory specified by system property '{}': {}",
+          KUDU_BIN_DIR_PROP, kuduBinDirProp);
+      return kuduBinDirProp;
+    }
+
+    // If the `kudu` binary is found on the PATH using `which kudu`, use its parent directory.
+    try {
+      Runtime runtime = Runtime.getRuntime();
+      Process process = runtime.exec("which kudu");
+      int errorCode = process.waitFor();
+      if (errorCode == 0) {
+        try(Reader reader = new InputStreamReader(process.getInputStream(), UTF_8)) {
+          String kuduBinary = CharStreams.toString(reader);
+          String kuduBinDir = new File(kuduBinary).getParent();
+          LOG.info("Using Kudu binary directory found on path with 'which kudu': {}", kuduBinDir);
+          return kuduBinDir;
+        }
+      }
+    } catch (IOException | InterruptedException ex) {
+      throw new RuntimeException("Error while locating kudu binary", ex);
+    }
+
+    throw new RuntimeException("Could not locate the kudu binary directory. " +
+        "Set the system variable " + KUDU_BIN_DIR_PROP +
+        " or ensure the `kudu` binary is on your path.");
+  }
+
+  /**
+   * @param binName the binary to look for (eg 'kudu-tserver')
+   * @return the absolute path of that binary
+   * @throws FileNotFoundException if no such binary is found
+   */
+  public static String findBinary(String binName) throws FileNotFoundException {
+    String binDir = findBinaryDir();
+
+    File candidate = new File(binDir, binName);
+    if (candidate.canExecute()) {
+      return candidate.getAbsolutePath();
+    }
+    throw new FileNotFoundException("Cannot find binary " + binName +
+        " in binary directory " + binDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
new file mode 100644
index 0000000..dae4344
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test.cluster;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.kudu.Common;
+import org.apache.kudu.client.HostAndPort;
+import org.apache.kudu.client.ProtobufHelper;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.tools.Tool.ControlShellRequestPB;
+import org.apache.kudu.tools.Tool.ControlShellResponsePB;
+import org.apache.kudu.tools.Tool.CreateClusterRequestPB.MiniKdcOptionsPB;
+import org.apache.kudu.tools.Tool.CreateClusterRequestPB;
+import org.apache.kudu.tools.Tool.DaemonIdentifierPB;
+import org.apache.kudu.tools.Tool.DaemonInfoPB;
+import org.apache.kudu.tools.Tool.GetKDCEnvVarsRequestPB;
+import org.apache.kudu.tools.Tool.GetMastersRequestPB;
+import org.apache.kudu.tools.Tool.GetTServersRequestPB;
+import org.apache.kudu.tools.Tool.KdestroyRequestPB;
+import org.apache.kudu.tools.Tool.KinitRequestPB;
+import org.apache.kudu.tools.Tool.StartClusterRequestPB;
+import org.apache.kudu.tools.Tool.StartDaemonRequestPB;
+import org.apache.kudu.tools.Tool.StopDaemonRequestPB;
+import org.apache.kudu.util.SecurityUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to start and manipulate Kudu clusters. Depends on precompiled
+ * kudu, kudu-master, and kudu-tserver binaries. {@link KuduTestHarness}
+ * should be used instead of directly using this class in almost all cases.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MiniKuduCluster implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MiniKuduCluster.class);
+
+  // Control shell process.
+  private Process miniCluster;
+
+  // Request channel to the control shell.
+  private DataOutputStream miniClusterStdin;
+
+  // Response channel from the control shell.
+  private DataInputStream miniClusterStdout;
+
+  // Thread that reads and logs stderr from the control shell.
+  private Thread miniClusterErrorPrinter;
+
+  private static class DaemonInfo {
+    DaemonIdentifierPB id;
+    boolean isRunning;
+  }
+
+  // Map of master addresses to daemon information.
+  private final Map<HostAndPort, DaemonInfo> masterServers = Maps.newHashMap();
+
+  // Map of tserver addresses to daemon information.
+  private final Map<HostAndPort, DaemonInfo> tabletServers = Maps.newHashMap();
+
+  // Builder-provided cluster configuration state.
+  private final boolean enableKerberos;
+  private final int numMasters;
+  private final int numTservers;
+  private final ImmutableList<String> extraTserverFlags;
+  private final ImmutableList<String> extraMasterFlags;
+  private final String clusterRoot;
+
+  private MiniKdcOptionsPB kdcOptionsPb;
+  private final Common.HmsMode hmsMode;
+
+  private MiniKuduCluster(boolean enableKerberos,
+      int numMasters,
+      int numTservers,
+      List<String> extraTserverFlags,
+      List<String> extraMasterFlags,
+      MiniKdcOptionsPB kdcOptionsPb,
+      String clusterRoot,
+      Common.HmsMode hmsMode) {
+    this.enableKerberos = enableKerberos;
+    this.numMasters = numMasters;
+    this.numTservers = numTservers;
+    this.extraTserverFlags = ImmutableList.copyOf(extraTserverFlags);
+    this.extraMasterFlags = ImmutableList.copyOf(extraMasterFlags);
+    this.kdcOptionsPb = kdcOptionsPb;
+    this.hmsMode = hmsMode;
+
+    if (clusterRoot == null) {
+      // If a cluster root was not set, create a  unique temp directory to use.
+      // The mini cluster will clean this directory up on exit.
+      try {
+        File tempRoot = getTempDirectory("mini-kudu-cluster");
+        this.clusterRoot = tempRoot.toString();
+      } catch (IOException ex) {
+        throw new RuntimeException("Could not create cluster root directory", ex);
+      }
+    } else {
+      this.clusterRoot = clusterRoot;
+    }
+  }
+
+  // Match the C++ MiniCluster test functionality for overriding the tmp directory used.
+  // See MakeClusterRoot in src/kudu/tools/tool_action_test.cc.
+  // If the TEST_TMPDIR environment variable is defined that directory will be used
+  // instead of the default temp directory.
+  private File getTempDirectory(String prefix) throws IOException  {
+    String testTmpdir = System.getenv("TEST_TMPDIR");
+    if (testTmpdir != null) {
+      LOG.info("Using the temp directory defined by TEST_TMPDIR: " + testTmpdir);
+      return Files.createTempDirectory(Paths.get(testTmpdir), prefix).toFile();
+    } else {
+      return Files.createTempDirectory(prefix).toFile();
+    }
+  }
+
+  /**
+   * Sends a command to the control shell and receives its response.
+   * <p>
+   * The method is synchronized to prevent interleaving of requests and responses.
+   * @param req control shell request
+   * @return control shell response
+   * @throws IOException if there was some kind of transport error, or if the
+   *                     response indicates an error
+   */
+  private synchronized ControlShellResponsePB sendRequestToCluster(ControlShellRequestPB req)
+      throws IOException {
+    // Send the request's size (4 bytes, big endian) followed by the request.
+    LOG.debug("Request: {}", req);
+    miniClusterStdin.writeInt(req.getSerializedSize());
+    miniClusterStdin.write(req.toByteArray());
+    miniClusterStdin.flush();
+
+    // Read the response's size (4 bytes, big endian) followed by the response.
+    int respLength = miniClusterStdout.readInt();
+    byte[] respBody = new byte[respLength];
+    miniClusterStdout.readFully(respBody);
+    ControlShellResponsePB resp = ControlShellResponsePB.parseFrom(respBody);
+    LOG.debug("Response: {}", resp);
+
+    // Convert any error into an exception.
+    if (resp.hasError()) {
+      throw new IOException(resp.getError().getMessage());
+    }
+    return resp;
+  }
+
+  /**
+   * Starts this Kudu cluster.
+   * @throws IOException if something went wrong in transit
+   */
+  private void start() throws IOException {
+    Preconditions.checkArgument(numMasters > 0, "Need at least one master");
+
+    // Start the control shell and the communication channel to it.
+    List<String> commandLine = Lists.newArrayList(
+        KuduBinaryLocator.findBinary("kudu"),
+        "test",
+        "mini_cluster",
+        "--serialization=pb");
+    LOG.info("Starting process: {}", commandLine);
+    ProcessBuilder processBuilder = new ProcessBuilder(commandLine);
+    miniCluster = processBuilder.start();
+    miniClusterStdin = new DataOutputStream(miniCluster.getOutputStream());
+    miniClusterStdout = new DataInputStream(miniCluster.getInputStream());
+
+    // Set up a thread that logs stderr from the control shell; this will
+    // include all cluster logging.
+    ProcessInputStreamLogPrinterRunnable printer =
+        new ProcessInputStreamLogPrinterRunnable(miniCluster.getErrorStream());
+    miniClusterErrorPrinter = new Thread(printer);
+    miniClusterErrorPrinter.setDaemon(true);
+    miniClusterErrorPrinter.setName("cluster stderr printer");
+    miniClusterErrorPrinter.start();
+
+    // Create and start the cluster.
+    sendRequestToCluster(
+        ControlShellRequestPB.newBuilder()
+        .setCreateCluster(CreateClusterRequestPB.newBuilder()
+            .setNumMasters(numMasters)
+            .setNumTservers(numTservers)
+            .setEnableKerberos(enableKerberos)
+            .setHmsMode(hmsMode)
+            .addAllExtraMasterFlags(extraMasterFlags)
+            .addAllExtraTserverFlags(extraTserverFlags)
+            .setMiniKdcOptions(kdcOptionsPb)
+            .setClusterRoot(clusterRoot)
+            .build())
+        .build());
+    sendRequestToCluster(
+        ControlShellRequestPB.newBuilder()
+        .setStartCluster(StartClusterRequestPB.newBuilder().build())
+        .build());
+
+    // If the cluster is Kerberized, retrieve the KDC's environment variables
+    // and adapt them into certain security-related system properties.
+    if (enableKerberos) {
+      ControlShellResponsePB resp = sendRequestToCluster(
+          ControlShellRequestPB.newBuilder()
+          .setGetKdcEnvVars(GetKDCEnvVarsRequestPB.newBuilder().build())
+          .build());
+      for (Map.Entry<String, String> e : resp.getGetKdcEnvVars().getEnvVarsMap().entrySet()) {
+        if (e.getKey().equals("KRB5_CONFIG")) {
+          System.setProperty("java.security.krb5.conf", e.getValue());
+        } else if (e.getKey().equals("KRB5CCNAME")) {
+          System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, e.getValue());
+        }
+      }
+    }
+
+    // Initialize the maps of master and tablet servers.
+    ControlShellResponsePB resp = sendRequestToCluster(
+        ControlShellRequestPB.newBuilder()
+        .setGetMasters(GetMastersRequestPB.newBuilder().build())
+        .build());
+    for (DaemonInfoPB info : resp.getGetMasters().getMastersList()) {
+      DaemonInfo d = new DaemonInfo();
+      d.id = info.getId();
+      d.isRunning = true;
+      masterServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), d);
+    }
+    resp = sendRequestToCluster(
+        ControlShellRequestPB.newBuilder()
+        .setGetTservers(GetTServersRequestPB.newBuilder().build())
+        .build());
+    for (DaemonInfoPB info : resp.getGetTservers().getTserversList()) {
+      DaemonInfo d = new DaemonInfo();
+      d.id = info.getId();
+      d.isRunning = true;
+      tabletServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), d);
+    }
+  }
+
+  /**
+   * @return comma-separated list of master server addresses
+   */
+  public String getMasterAddressesAsString() {
+    return Joiner.on(',').join(masterServers.keySet());
+  }
+
+  /**
+   * @return the list of master servers
+   */
+  public List<HostAndPort> getMasterServers() {
+    return new ArrayList(masterServers.keySet());
+  }
+
+  /**
+   * @return the list of tablet servers
+   */
+  public List<HostAndPort> getTabletServers() {
+    return new ArrayList(tabletServers.keySet());
+  }
+
+  /**
+   * Starts a master identified by a host and port.
+   * Does nothing if the server was already running.
+   *
+   * @param hp unique host and port identifying the server
+   * @throws IOException if something went wrong in transit
+   */
+  public void startMasterServer(HostAndPort hp) throws IOException {
+    DaemonInfo d = getMasterServer(hp);
+    if (d.isRunning) {
+      return;
+    }
+    LOG.info("Starting master server {}", hp);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setStartDaemon(StartDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isRunning = true;
+  }
+
+  /**
+   * Kills a master identified identified by an host and port.
+   * Does nothing if the master was already dead.
+   *
+   * @param hp unique host and port identifying the server
+   * @throws IOException if something went wrong in transit
+   */
+  public void killMasterServer(HostAndPort hp) throws IOException {
+    DaemonInfo d = getMasterServer(hp);
+    if (!d.isRunning) {
+      return;
+    }
+    LOG.info("Killing master server {}", hp);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setStopDaemon(StopDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isRunning = false;
+  }
+
+  /**
+   * Starts a tablet server identified by an host and port.
+   * Does nothing if the server was already running.
+   *
+   * @param hp unique host and port identifying the server
+   * @throws IOException if something went wrong in transit
+   */
+  public void startTabletServer(HostAndPort hp) throws IOException {
+    DaemonInfo d = getTabletServer(hp);
+    if (d.isRunning) {
+      return;
+    }
+    LOG.info("Starting tablet server {}", hp);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setStartDaemon(StartDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isRunning = true;
+  }
+
+  /**
+   * Kills a tablet server identified by an host and port.
+   * Does nothing if the tablet server was already dead.
+   *
+   * @param hp unique host and port identifying the server
+   * @throws IOException if something went wrong in transit
+   */
+  public void killTabletServer(HostAndPort hp) throws IOException {
+    DaemonInfo d = getTabletServer(hp);
+    if (!d.isRunning) {
+      return;
+    }
+    LOG.info("Killing tablet server {}", hp);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setStopDaemon(StopDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isRunning = false;
+  }
+
+  /**
+   * Kills all the master servers.
+   * Does nothing to the servers that are already dead.
+   *
+   * @throws IOException if something went wrong in transit
+   */
+  public void killAllMasterServers() throws IOException {
+    for (Map.Entry<HostAndPort, DaemonInfo> e : masterServers.entrySet()) {
+      killMasterServer(e.getKey());
+    }
+  }
+
+  /**
+   * Starts all the master servers.
+   * Does nothing to the servers that are already running.
+   *
+   * @throws IOException if something went wrong in transit
+   */
+  public void startAllMasterServers() throws IOException {
+    for (Map.Entry<HostAndPort, DaemonInfo> e : masterServers.entrySet()) {
+      startMasterServer(e.getKey());
+    }
+  }
+
+  /**
+   * Kills all tablet servers.
+   * Does nothing to the servers that are already dead.
+   *
+   * @throws IOException if something went wrong in transit
+   */
+  public void killAllTabletServers() throws IOException {
+    for (Map.Entry<HostAndPort, DaemonInfo> e : tabletServers.entrySet()) {
+      killTabletServer(e.getKey());
+    }
+  }
+
+  /**
+   * Starts all the tablet servers.
+   * Does nothing to the servers that are already running.
+   *
+   * @throws IOException if something went wrong in transit
+   */
+  public void startAllTabletServers() throws IOException {
+    for (Map.Entry<HostAndPort, DaemonInfo> e : tabletServers.entrySet()) {
+      startTabletServer(e.getKey());
+    }
+  }
+
+  /**
+   * Removes all credentials for all principals from the Kerberos credential cache.
+   */
+  public void kdestroy() throws IOException {
+    LOG.info("Destroying all Kerberos credentials");
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setKdestroy(KdestroyRequestPB.getDefaultInstance())
+        .build());
+  }
+
+  /**
+   * Re-initialize Kerberos credentials for the given username, writing them
+   * into the Kerberos credential cache.
+   * @param username the username to kinit as
+   */
+  public void kinit(String username) throws IOException {
+    LOG.info("Running kinit for user {}", username);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setKinit(KinitRequestPB.newBuilder().setUsername(username).build())
+        .build());
+  }
+
+
+  /** {@override} */
+  @Override
+  public void close() {
+    shutdown();
+  }
+
+  /**
+   * Shuts down a Kudu cluster.
+   */
+  public void shutdown() {
+    // Closing stdin should cause the control shell process to terminate.
+    if (miniClusterStdin != null) {
+      try {
+        miniClusterStdin.close();
+      } catch (IOException e) {
+        LOG.info("Caught exception while closing minicluster stdin", e);
+      }
+    }
+    if (miniClusterStdout != null) {
+      try {
+        miniClusterStdout.close();
+      } catch (IOException e) {
+        LOG.info("Caught exception while closing minicluster stdout", e);
+      }
+    }
+    if (miniClusterErrorPrinter != null) {
+      try {
+        miniClusterErrorPrinter.join();
+      } catch (InterruptedException e) {
+        LOG.info("Caught exception while closing minicluster stderr", e);
+      }
+    }
+    if (miniCluster != null) {
+      try {
+        miniCluster.waitFor();
+      } catch (InterruptedException e) {
+        LOG.warn("Minicluster process did not exit, destroying");
+        miniCluster.destroy();
+      }
+    }
+  }
+
+  /**
+   * Returns a master server identified by an address.
+   *
+   * @param hp unique host and port identifying the server
+   * @return the DaemonInfo of the server
+   * @throws RuntimeException if the server is not found
+   */
+  private DaemonInfo getMasterServer(HostAndPort hp) throws RuntimeException {
+    DaemonInfo d = masterServers.get(hp);
+    if (d == null) {
+      throw new RuntimeException(String.format("Master server %s not found", hp));
+    }
+    return d;
+  }
+
+  /**
+   * Returns a tablet server identified by an address.
+   *
+   * @param hp unique host and port identifying the server
+   * @return the DaemonInfo of the server
+   * @throws RuntimeException if the server is not found
+   */
+  private DaemonInfo getTabletServer(HostAndPort hp) throws RuntimeException {
+    DaemonInfo d = tabletServers.get(hp);
+    if (d == null) {
+      throw new RuntimeException(String.format("Tablet server %s not found", hp));
+    }
+    return d;
+  }
+
+  /**
+   * @return path to the mini cluster root directory
+   */
+  public String getClusterRoot() {
+    return clusterRoot;
+  }
+
+  /**
+   * Helper runnable that receives stderr and logs it along with the process' identifier.
+   */
+  public static class ProcessInputStreamLogPrinterRunnable implements Runnable {
+
+    private final InputStream is;
+
+    public ProcessInputStreamLogPrinterRunnable(InputStream is) {
+      this.is = is;
+    }
+
+    @Override
+    public void run() {
+      try {
+        String line;
+        BufferedReader in = new BufferedReader(
+            new InputStreamReader(is, UTF_8));
+        while ((line = in.readLine()) != null) {
+          LOG.info(line);
+        }
+        in.close();
+      } catch (Exception e) {
+        if (!e.getMessage().contains("Stream closed")) {
+          LOG.error("Caught error while reading a process' output", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Builder for {@link MiniKuduCluster}
+   */
+  public static class MiniKuduClusterBuilder {
+
+    private int numMasterServers = 1;
+    private int numTabletServers = 3;
+    private boolean enableKerberos = false;
+    private final List<String> extraTabletServerFlags = new ArrayList<>();
+    private final List<String> extraMasterServerFlags = new ArrayList<>();
+    private String clusterRoot = null;
+
+    private MiniKdcOptionsPB.Builder kdcOptionsPb = MiniKdcOptionsPB.newBuilder();
+    private Common.HmsMode hmsMode = Common.HmsMode.NONE;
+
+    public MiniKuduClusterBuilder numMasterServers(int numMasterServers) {
+      this.numMasterServers = numMasterServers;
+      return this;
+    }
+
+    public MiniKuduClusterBuilder numTabletServers(int numTabletServers) {
+      this.numTabletServers = numTabletServers;
+      return this;
+    }
+
+    /**
+     * Enables Kerberos on the mini cluster and acquire client credentials for this process.
+     * @return this instance
+     */
+    public MiniKuduClusterBuilder enableKerberos() {
+      enableKerberos = true;
+      return this;
+    }
+
+    public MiniKuduClusterBuilder enableHiveMetastoreIntegration() {
+      hmsMode = Common.HmsMode.ENABLE_METASTORE_INTEGRATION;
+      return this;
+    }
+
+    /**
+     * Adds a new flag to be passed to the Tablet Server daemons on start.
+     * @return this instance
+     */
+    public MiniKuduClusterBuilder addTabletServerFlag(String flag) {
+      this.extraTabletServerFlags.add(flag);
+      return this;
+    }
+
+    /**
+     * Adds a new flag to be passed to the Master daemons on start.
+     * @return this instance
+     */
+    public MiniKuduClusterBuilder addMasterServerFlag(String flag) {
+      this.extraMasterServerFlags.add(flag);
+      return this;
+    }
+
+    public MiniKuduClusterBuilder kdcTicketLifetime(String lifetime) {
+      this.kdcOptionsPb.setTicketLifetime(lifetime);
+      return this;
+    }
+
+    public MiniKuduClusterBuilder kdcRenewLifetime(String lifetime) {
+      this.kdcOptionsPb.setRenewLifetime(lifetime);
+      return this;
+    }
+
+    /**
+     * Sets the directory where the cluster's data and logs should be placed.
+     * @return this instance
+     */
+    public MiniKuduClusterBuilder clusterRoot(String clusterRoot) {
+      this.clusterRoot = clusterRoot;
+      return this;
+    }
+
+    /**
+     * Builds and starts a new {@link MiniKuduCluster} using builder state.
+     * @return the newly started {@link MiniKuduCluster}
+     * @throws IOException if something went wrong starting the cluster
+     */
+    public MiniKuduCluster build() throws IOException {
+      MiniKuduCluster cluster =
+          new MiniKuduCluster(enableKerberos,
+              numMasterServers, numTabletServers,
+              extraTabletServerFlags, extraMasterServerFlags,
+              kdcOptionsPb.build(), clusterRoot, hmsMode);
+      try {
+        cluster.start();
+      } catch (IOException e) {
+        // MiniKuduCluster.close should not throw, so no need for a nested try/catch.
+        cluster.close();
+        throw e;
+      }
+      return cluster;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/AssertHelpers.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/AssertHelpers.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/AssertHelpers.java
new file mode 100644
index 0000000..9fc2c0c
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/AssertHelpers.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test.junit;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import static org.junit.Assert.assertTrue;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class AssertHelpers {
+  public interface BooleanExpression {
+    boolean get() throws Exception;
+  }
+
+  // A looping check. It's mainly useful for scanners, since writes may take a little time to show
+  // up.
+  public static void assertEventuallyTrue(String description, BooleanExpression expression,
+                                          long timeoutMillis) throws Exception {
+    long deadlineNanos = System.nanoTime() + timeoutMillis * 1000000;
+    boolean success;
+
+    do {
+      success = expression.get();
+      if (success) break;
+      Thread.sleep(50); // Sleep for 50ms
+    } while (System.nanoTime() < deadlineNanos);
+
+    assertTrue(description, success);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/RetryRule.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/RetryRule.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/RetryRule.java
new file mode 100644
index 0000000..b096a3e
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/RetryRule.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test.junit;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JUnit rule to retry failed tests.
+ * We use this with Gradle because it doesn't support
+ * Surefire/Failsafe rerunFailingTestsCount like Maven does. We use the system
+ * property rerunFailingTestsCount to mimic the maven arguments closely.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RetryRule implements TestRule {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RetryRule.class);
+  private static final int RETRY_COUNT = Integer.getInteger("rerunFailingTestsCount", 0);
+
+  public RetryRule () {}
+
+  @Override
+  public Statement apply(Statement base, Description description) {
+    return new RetryStatement(base, description, RETRY_COUNT);
+  }
+
+  private static class RetryStatement extends Statement {
+
+    private final Statement base;
+    private final Description description;
+    private final int retryCount;
+
+    RetryStatement(final Statement base, final Description description, final int retryCount) {
+      this.base = base;
+      this.description = description;
+      this.retryCount = retryCount;
+    }
+
+    @Override
+    public void evaluate() throws Throwable {
+      // If there are no retries, just pass through to evaluate as usual.
+      if (retryCount == 0) {
+        base.evaluate();
+        return;
+      }
+
+      // To retry we catch the exception for the evaluate, log a message, and retry.
+      // We track and throw the last failure if all tries fail.
+      Throwable lastException = null;
+      for (int i = 0; i < retryCount; i++) {
+        try {
+          base.evaluate();
+          return;
+        } catch (Throwable t) {
+          lastException = t;
+          LOG.error(description.getDisplayName() + ": failed run " + (i + 1), t);
+        }
+      }
+      LOG.error(description.getDisplayName() + ": giving up after " + retryCount + " failures");
+      throw lastException;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
new file mode 100644
index 0000000..95a3843
--- /dev/null
+++ b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.apache.kudu.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.kudu.client.DeadlineTracker;
+import org.apache.kudu.client.HostAndPort;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduClient.KuduClientBuilder;
+import org.apache.kudu.client.ListTablesResponse;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.apache.kudu.test.junit.RetryRule;
+import org.apache.kudu.test.cluster.FakeDNS;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class TestMiniKuduCluster {
+
+  private static final int NUM_TABLET_SERVERS = 3;
+  private static final int NUM_MASTERS = 1;
+  private static final long SLEEP_TIME_MS = 10000;
+
+  @Rule
+  public RetryRule retryRule = new RetryRule();
+
+  @Test(timeout = 50000)
+  public void test() throws Exception {
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+                                                      .numMasterServers(NUM_MASTERS)
+                                                      .numTabletServers(NUM_TABLET_SERVERS)
+                                                      .build()) {
+      assertEquals(NUM_MASTERS, cluster.getMasterServers().size());
+      assertEquals(NUM_TABLET_SERVERS, cluster.getTabletServers().size());
+
+      {
+        // Kill the master.
+        HostAndPort masterHostPort = cluster.getMasterServers().get(0);
+        testHostPort(masterHostPort, true);
+        cluster.killMasterServer(masterHostPort);
+
+        testHostPort(masterHostPort, false);
+
+        // Restart the master.
+        cluster.startMasterServer(masterHostPort);
+
+        // Test we can reach it.
+        testHostPort(masterHostPort, true);
+      }
+
+      {
+        // Kill the first TS.
+        HostAndPort tsHostPort = cluster.getTabletServers().get(0);
+        testHostPort(tsHostPort, true);
+        cluster.killTabletServer(tsHostPort);
+
+        testHostPort(tsHostPort, false);
+
+        // Restart it.
+        cluster.startTabletServer(tsHostPort);
+
+        testHostPort(tsHostPort, true);
+      }
+    }
+  }
+
+  @Test(timeout = 50000)
+  public void testKerberos() throws Exception {
+    FakeDNS.getInstance().install();
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+                                                      .numMasterServers(NUM_MASTERS)
+                                                      .numTabletServers(NUM_TABLET_SERVERS)
+                                                      .enableKerberos()
+                                                      .build()) {
+      KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+      ListTablesResponse resp = client.getTablesList();
+      assertTrue(resp.getTablesList().isEmpty());
+      assertNull(client.getHiveMetastoreConfig());
+    }
+  }
+
+  @Test(timeout = 50000)
+  public void testHiveMetastoreIntegration() throws Exception {
+    try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+                                                      .numMasterServers(NUM_MASTERS)
+                                                      .numTabletServers(NUM_TABLET_SERVERS)
+                                                      .enableHiveMetastoreIntegration()
+                                                      .build()) {
+      KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+      assertNotNull(client.getHiveMetastoreConfig());
+    }
+  }
+
+  /**
+   * Test whether the specified host and port is open or closed, waiting up to a certain time.
+   * @param hp the host and port to test
+   * @param testIsOpen true if we should want it to be open, false if we want it closed
+   */
+  private static void testHostPort(HostAndPort hp,
+                                   boolean testIsOpen) throws InterruptedException {
+    DeadlineTracker tracker = new DeadlineTracker();
+    while (tracker.getElapsedMillis() < SLEEP_TIME_MS) {
+      try {
+        Socket socket = new Socket(hp.getHost(), hp.getPort());
+        socket.close();
+        if (testIsOpen) {
+          return;
+        }
+      } catch (IOException e) {
+        if (!testIsOpen) {
+          return;
+        }
+      }
+      Thread.sleep(200);
+    }
+    fail("HostAndPort " + hp + " is still " + (testIsOpen ? "closed " : "open"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/settings.gradle
----------------------------------------------------------------------
diff --git a/java/settings.gradle b/java/settings.gradle
index 6a60213..96ca03f 100644
--- a/java/settings.gradle
+++ b/java/settings.gradle
@@ -28,3 +28,4 @@ include "kudu-jepsen"
 include "kudu-mapreduce"
 include "kudu-spark"
 include "kudu-spark-tools"
+include "kudu-test-utils"


Mime
View raw message