kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [1/6] kudu git commit: [test] Move BaseKuduTest to a Junit Rule
Date Tue, 02 Oct 2018 19:01:52 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 6466c0d7d -> 8513685ba


http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-client/src/test/java/org/apache/kudu/test/KuduTestHarness.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-client/src/test/java/org/apache/kudu/test/KuduTestHarness.java
new file mode 100644
index 0000000..eeb9e9c
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/test/KuduTestHarness.java
@@ -0,0 +1,445 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test;
+
+import com.google.common.base.Stopwatch;
+import com.stumbleupon.async.Deferred;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.AsyncKuduClient.AsyncKuduClientBuilder;
+import org.apache.kudu.client.DeadlineTracker;
+import org.apache.kudu.client.FakeDNS;
+import org.apache.kudu.client.HostAndPort;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.MiniKuduCluster;
+import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder;
+import org.apache.kudu.client.RemoteTablet;
+import org.apache.kudu.junit.RetryRule;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.util.RandomUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A Junit Rule that manages a Kudu cluster and clients for testing.
+ * This rule also includes utility methods for the cluster
+ * and clients.
+ *
+ * <pre>
+ * public static class TestFoo {
+ *
+ *  &#064;Rule
+ *  public KuduTestHarness harness = new KuduTestHarness();
+ *
+ *  ...
+ * }
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class KuduTestHarness extends ExternalResource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KuduTestHarness.class);
+
+  private static final int NUM_MASTER_SERVERS = 3;
+  private static final int NUM_TABLET_SERVERS = 3;
+
+  // Default timeout/sleep interval for various client operations,
+  // waiting for various jobs/threads to complete, etc.
+  public static final int DEFAULT_SLEEP = 50000;
+
+  private final Random randomForTSRestart = RandomUtils.getRandom();
+
+  private MiniKuduClusterBuilder clusterBuilder;
+  private MiniKuduCluster miniCluster;
+
+  // We create both versions of the asyncClient for ease of use.
+  private AsyncKuduClient asyncClient;
+  private KuduClient client;
+
+  public KuduTestHarness(final MiniKuduClusterBuilder clusterBuilder) {
+    this.clusterBuilder = clusterBuilder;
+  }
+
+  public KuduTestHarness() {
+    this.clusterBuilder = getBaseClusterBuilder();
+  }
+
+  /**
+   * Returns the base MiniKuduClusterBuilder used when creating a
+   * KuduTestHarness with the default constructor. This is useful
+   * if you want to add to the default cluster setup.
+   */
+  public static MiniKuduClusterBuilder getBaseClusterBuilder() {
+    return new MiniKuduClusterBuilder()
+        .numMasterServers(NUM_MASTER_SERVERS)
+        .numTabletServers(NUM_TABLET_SERVERS);
+  }
+
+  @Override
+  public Statement apply(Statement base, Description description) {
+    // Set any master server flags defined in the method level annotation.
+    MasterServerConfig masterServerConfig = description.getAnnotation(MasterServerConfig.class);
+    if (masterServerConfig != null) {
+      for (String flag : masterServerConfig.flags()) {
+        clusterBuilder.addMasterServerFlag(flag);
+      }
+    }
+    // Set any tablet server flags defined in the method level annotation.
+    TabletServerConfig tabletServerConfig = description.getAnnotation(TabletServerConfig.class);
+    if (tabletServerConfig != null) {
+      for (String flag : tabletServerConfig.flags()) {
+        clusterBuilder.addTabletServerFlag(flag);
+      }
+    }
+
+    // Generate the ExternalResource Statement.
+    Statement statement = super.apply(base, description);
+    // Wrap in the RetryRule to rerun flaky tests.
+    return new RetryRule().apply(statement, description);
+  }
+
+  @Override
+  public void before() throws Exception {
+    FakeDNS.getInstance().install();
+    LOG.info("Creating a new MiniKuduCluster...");
+    miniCluster = clusterBuilder.build();
+    LOG.info("Creating a new Kudu client...");
+    asyncClient = new AsyncKuduClientBuilder(miniCluster.getMasterAddressesAsString())
+        .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+        .build();
+    client = asyncClient.syncClient();
+  }
+
+  @Override
+  public void after() {
+    try {
+      if (client != null) {
+        client.shutdown();
+        // No need to explicitly shutdown the async client,
+        // shutting down the sync client effectively does that.
+      }
+    } catch (KuduException e) {
+      LOG.warn("Error while shutting down the test client");
+    } finally {
+      if (miniCluster != null) {
+        miniCluster.shutdown();
+      }
+    }
+  }
+
+  public KuduClient getClient() {
+    return client;
+  }
+
+  public AsyncKuduClient getAsyncClient() {
+    return asyncClient;
+  }
+
+  /**
+   * Helper method to easily kill a tablet server that serves the given table's only tablet's
+   * leader. The currently running test case will be failed if there's more than one tablet,
+   * if the tablet has no leader after some retries, or if the tablet server was already
killed.
+   *
+   * This method is thread-safe.
+   * @param table a KuduTable which will get its single tablet's leader killed.
+   * @throws Exception
+   */
+  public void killTabletLeader(KuduTable table) throws Exception {
+    List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
+    if (tablets.isEmpty() || tablets.size() > 1) {
+      fail("Currently only support killing leaders for tables containing 1 tablet, table
" +
+          table.getName() + " has " + tablets.size());
+    }
+    LocatedTablet tablet = tablets.get(0);
+    if (tablet.getReplicas().size() == 1) {
+      fail("Table " + table.getName() + " only has 1 tablet, please enable replication");
+    }
+
+    HostAndPort hp = findLeaderTabletServer(tablet);
+    miniCluster.killTabletServer(hp);
+  }
+
+  /**
+   * Helper method to kill a tablet server that serves the given tablet's
+   * leader. The currently running test case will be failed if the tablet has no
+   * leader after some retries, or if the tablet server was already killed.
+   *
+   * This method is thread-safe.
+   * @param tablet a RemoteTablet which will get its leader killed
+   * @throws Exception
+   */
+  public void killTabletLeader(RemoteTablet tablet) throws Exception {
+    killTabletLeader(new LocatedTablet(tablet));
+  }
+
+  /**
+   * Helper method to kill a tablet server that serves the given tablet's
+   * leader. The currently running test case will be failed if the tablet has no
+   * leader after some retries, or if the tablet server was already killed.
+   *
+   * This method is thread-safe.
+   * @param tablet a LocatedTablet which will get its leader killed
+   * @throws Exception
+   */
+  public void killTabletLeader(LocatedTablet tablet) throws Exception {
+    HostAndPort hp = findLeaderTabletServer(tablet);
+    miniCluster.killTabletServer(hp);
+  }
+
+  /**
+   * Finds the RPC port of the given tablet's leader tserver.
+   * @param tablet a LocatedTablet
+   * @return the host and port of the given tablet's leader tserver
+   * @throws Exception if we are unable to find the leader tserver
+   */
+  public HostAndPort findLeaderTabletServer(LocatedTablet tablet)
+      throws Exception {
+    LocatedTablet.Replica leader = null;
+    DeadlineTracker deadlineTracker = new DeadlineTracker();
+    deadlineTracker.setDeadline(DEFAULT_SLEEP);
+    while (leader == null) {
+      if (deadlineTracker.timedOut()) {
+        fail("Timed out while trying to find a leader for this table");
+      }
+
+      leader = tablet.getLeaderReplica();
+      if (leader == null) {
+        LOG.info("Sleeping while waiting for a tablet LEADER to arise, currently slept {}
ms",
+            deadlineTracker.getElapsedMillis());
+        Thread.sleep(50);
+      }
+    }
+    return new HostAndPort(leader.getRpcHost(), leader.getRpcPort());
+  }
+
+  /**
+   * Helper method to easily kill the leader master.
+   *
+   * This method is thread-safe.
+   * @throws Exception if there is an error finding or killing the leader master.
+   */
+  public void killLeaderMasterServer() throws Exception {
+    HostAndPort hp = findLeaderMasterServer();
+    miniCluster.killMasterServer(hp);
+  }
+
+  /**
+   * Find the host and port of the leader master.
+   * @return the host and port of the leader master
+   * @throws Exception if we are unable to find the leader master
+   */
+  public HostAndPort findLeaderMasterServer() throws Exception {
+    Stopwatch sw = Stopwatch.createStarted();
+    while (sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) {
+      Deferred<Master.GetTableLocationsResponsePB> masterLocD =
+          asyncClient.getMasterTableLocationsPB(null);
+      Master.GetTableLocationsResponsePB r = masterLocD.join(DEFAULT_SLEEP);
+      Common.HostPortPB pb = r.getTabletLocations(0)
+          .getReplicas(0)
+          .getTsInfo()
+          .getRpcAddresses(0);
+      if (pb.getPort() != -1) {
+        return new HostAndPort(pb.getHost(), pb.getPort());
+      }
+    }
+    throw new IOException(String.format("No leader master found after %d ms", DEFAULT_SLEEP));
+  }
+
+  /**
+   * Picks at random a tablet server that serves tablets from the passed table and restarts
it.
+   * @param table table to query for a TS to restart
+   * @throws Exception
+   */
+  public void restartTabletServer(KuduTable table) throws Exception {
+    List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
+    if (tablets.isEmpty()) {
+      fail("Table " + table.getName() + " doesn't have any tablets");
+    }
+
+    LocatedTablet tablet = tablets.get(0);
+    LocatedTablet.Replica replica =
+        tablet.getReplicas().get(randomForTSRestart.nextInt(tablet.getReplicas().size()));
+    HostAndPort hp = new HostAndPort(replica.getRpcHost(), replica.getRpcPort());
+    miniCluster.killTabletServer(hp);
+    miniCluster.startTabletServer(hp);
+  }
+
+  /**
+   * Kills a tablet server that serves the given tablet's leader and restarts it.
+   * @param tablet a RemoteTablet which will get its leader killed and restarted
+   * @throws Exception
+   */
+  public void restartTabletServer(RemoteTablet tablet) throws Exception {
+    HostAndPort hp = findLeaderTabletServer(new LocatedTablet(tablet));
+    miniCluster.killTabletServer(hp);
+    miniCluster.startTabletServer(hp);
+  }
+
+  /**
+   * Kills and restarts the leader master.
+   * @throws Exception
+   */
+  public void restartLeaderMaster() throws Exception {
+    HostAndPort hp = findLeaderMasterServer();
+    miniCluster.killMasterServer(hp);
+    miniCluster.startMasterServer(hp);
+  }
+
+  /**
+   * Return the comma-separated list of "host:port" pairs that describes the master
+   * config for this cluster.
+   * @return The master config string.
+   */
+  public String getMasterAddressesAsString() {
+    return miniCluster.getMasterAddressesAsString();
+  }
+
+  /**
+   * @return the list of master servers
+   */
+  public List<HostAndPort> getMasterServers() {
+    return miniCluster.getMasterServers();
+  }
+
+  /**
+   * @return the list of tablet servers
+   */
+  public List<HostAndPort> getTabletServers() {
+    return miniCluster.getMasterServers();
+  }
+
+  /**
+   * @return path to the mini cluster root directory
+   */
+  public String getClusterRoot() {
+    return miniCluster.getClusterRoot();
+  }
+
+  /**
+   * Kills all the master servers.
+   * Does nothing to the servers that are already dead.
+   *
+   * @throws IOException
+   */
+  public void killAllMasterServers() throws IOException {
+    miniCluster.killAllMasterServers();
+  }
+
+  /**
+   * Starts all the master servers.
+   * Does nothing to the servers that are already running.
+   *
+   * @throws IOException
+   */
+  public void startAllMasterServers() throws IOException {
+    miniCluster.startAllMasterServers();
+  }
+
+  /**
+   * Kills all the tablet servers.
+   * Does nothing to the servers that are already dead.
+   *
+   * @throws IOException
+   */
+  public void killAllTabletServers() throws IOException {
+    miniCluster.killAllTabletServers();
+  }
+
+  /**
+   * Starts all the tablet servers.
+   * Does nothing to the servers that are already running.
+   *
+   * @throws IOException
+   */
+  public void startAllTabletServers() throws IOException {
+    miniCluster.startAllTabletServers();
+  }
+
+  /**
+   * Removes all credentials for all principals from the Kerberos credential cache.
+   */
+  public void kdestroy() throws IOException {
+    miniCluster.kdestroy();
+  }
+
+  /**
+   * Re-initialize Kerberos credentials for the given username, writing them
+   * into the Kerberos credential cache.
+   * @param username the username to kinit as
+   */
+  public void kinit(String username) throws IOException {
+    miniCluster.kinit(username);
+  }
+
+  /**
+   * Resets the clients so that their state is completely fresh, including meta
+   * cache, connections, open tables, sessions and scanners, and propagated timestamp.
+   */
+  public void resetClients() throws IOException {
+    client.shutdown();
+    asyncClient = new AsyncKuduClientBuilder(miniCluster.getMasterAddressesAsString())
+        .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+        .build();
+    client = asyncClient.syncClient();
+  }
+
+  /**
+   * An annotation that can be added to each test method to
+   * define additional master server flags to be used when
+   * creating the test cluster.
+   *
+   * ex: @MasterServerConfig(flags = { "key1=valA", "key2=valB" })
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD})
+  public @interface MasterServerConfig {
+    String[] flags();
+  }
+
+  /**
+   * An annotation that can be added to each test method to
+   * define additional tablet server flags to be used when
+   * creating the test cluster.
+   *
+   * ex: @TabletServerConfig(flags = { "key1=valA", "key2=valB" })
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD})
+  public @interface TabletServerConfig {
+    String[] flags();
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
index 9f200b8..91bc339 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
@@ -49,18 +49,18 @@ import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
-
+import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.util.DecimalUtil;
 
-public class AvroKuduOperationsProducerTest extends BaseKuduTest {
+public class AvroKuduOperationsProducerTest {
   private static String schemaUriString;
   private static String schemaLiteral;
 
@@ -80,6 +80,9 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
     GLOBAL, URL, LITERAL
   }
 
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
   @Test
   public void testEmptyChannel() throws Exception {
     testEvents(0, SchemaLocation.GLOBAL);
@@ -116,7 +119,7 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
 
     List<Event> events = generateEvents(eventCount, schemaLocation);
 
-    KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
+    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
 
     List<String> answers = makeAnswers(eventCount);
     List<String> rows = scanTableToStrings(table);
@@ -137,7 +140,7 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
     CreateTableOptions createOptions =
         new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
             .setNumReplicas(1);
-    return createTable(tableName, new Schema(columns), createOptions);
+    return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
   }
 
   private List<Event> generateEvents(int eventCount,

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
index 1940369..8914b06 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.kudu.flume.sink;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -37,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,13 +44,16 @@ import org.slf4j.LoggerFactory;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.KuduTestHarness;
 
-public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
+public class KeyedKuduOperationsProducerTest {
   private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduOperationsProducerTest.class);
 
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
   private KuduTable createNewTable(String tableName) throws Exception {
     LOG.info("Creating new table...");
 
@@ -64,7 +67,8 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
     CreateTableOptions createOptions =
         new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT))
                               .setNumReplicas(1);
-    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+    KuduTable table =
+        harness.getClient().createTable(tableName, new Schema(columns), createOptions);
 
     LOG.info("Created new table.");
 
@@ -111,7 +115,7 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
         PRODUCER_PREFIX + OPERATION_PROP, "upsert",
         PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
     ));
-    KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, ctx);
+    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, ctx);
     sink.start();
 
     int numRows = 3;
@@ -159,7 +163,7 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
 
     List<Event> events = getEvents(eventCount);
 
-    KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
+    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
 
     List<String> rows = scanTableToStrings(table);
     assertEquals(eventCount + " row(s) expected", eventCount, rows.size());

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
index eb5f7c8..3887482 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.kudu.flume.sink;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -41,7 +40,9 @@ import org.apache.flume.Sink.Status;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,13 +50,15 @@ import org.slf4j.LoggerFactory;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduTable;
 
-public class KuduSinkTest extends BaseKuduTest {
+public class KuduSinkTest {
   private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
 
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
   private KuduTable createNewTable(String tableName) throws Exception {
     LOG.info("Creating new table...");
 
@@ -64,7 +67,7 @@ public class KuduSinkTest extends BaseKuduTest {
     CreateTableOptions createOptions =
         new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
                                 .setNumReplicas(1);
-    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+    KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
 
     LOG.info("Created new table.");
 
@@ -75,7 +78,7 @@ public class KuduSinkTest extends BaseKuduTest {
   public void testMandatoryParameters() {
     LOG.info("Testing mandatory parameters...");
 
-    KuduSink sink = new KuduSink(syncClient);
+    KuduSink sink = new KuduSink(harness.getClient());
 
     HashMap<String, String> parameters = new HashMap<>();
     Context context = new Context(parameters);
@@ -102,7 +105,7 @@ public class KuduSinkTest extends BaseKuduTest {
   public void testMissingTable() {
     LOG.info("Testing missing table...");
 
-    KuduSink sink = KuduSinkTestUtil.createSink(syncClient, "missingTable", new Context());
+    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), "missingTable", new
Context());
     sink.start();
 
     LOG.info("Testing missing table finished successfully.");
@@ -139,7 +142,7 @@ public class KuduSinkTest extends BaseKuduTest {
     Context sinkContext = new Context();
     sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
                     Boolean.toString(ignoreDuplicateRows));
-    KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, sinkContext);
+    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, sinkContext);
     sink.start();
     Channel channel = sink.getChannel();
 
@@ -193,7 +196,7 @@ public class KuduSinkTest extends BaseKuduTest {
       events.add(e);
     }
 
-    KuduSinkTestUtil.processEventsCreatingSink(syncClient, new Context(), tableName, events);
+    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), new Context(), tableName,
events);
 
     List<String> rows = scanTableToStrings(table);
     assertEquals(eventCount + " row(s) expected", eventCount, rows.size());

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
index 9bc5942..fda478c 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
@@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -27,12 +28,11 @@ import org.junit.rules.ExpectedException;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.util.CapturingLogAppender;
 
-public class RegexpKuduOperationsProducerParseErrorTest extends BaseKuduTest {
+public class RegexpKuduOperationsProducerParseErrorTest {
   private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<stringFld>\\w+)";
   private static final String TEST_REGEXP_MISSING_COLUMN = "(?<key>\\d+),(?<byteFld>\\d+)";
   private static final String TEST_OPERATION = "insert";
@@ -54,6 +54,9 @@ public class RegexpKuduOperationsProducerParseErrorTest extends BaseKuduTest
{
   private static final String POLICY_IGNORE = "IGNORE";
 
   @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
@@ -265,7 +268,7 @@ public class RegexpKuduOperationsProducerParseErrorTest extends BaseKuduTest
{
     columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
     CreateTableOptions createOptions =
         new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
-    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+    KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
     return table;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
index cadfa2e..b5c4e28 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.kudu.flume.sink;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -35,22 +34,26 @@ import com.google.common.collect.ImmutableList;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.util.DecimalUtil;
 
-public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
+public class RegexpKuduOperationsProducerTest {
   private static final String TEST_REGEXP =
       "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+),"
+
       "(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+),"
+
       "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*),(?<decimalFld>\\d+.\\d*)";
 
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
   private KuduTable createNewTable(String tableName) throws Exception {
     ArrayList<ColumnSchema> columns = new ArrayList<>(10);
     columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
@@ -67,7 +70,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
         .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
     CreateTableOptions createOptions =
         new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
-    return createTable(tableName, new Schema(columns), createOptions);
+    return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
   }
 
   @Test
@@ -116,7 +119,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
 
     List<Event> events = generateEvents(eventCount, perEventRowCount, operation);
 
-    KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
+    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
 
     List<String> rows = scanTableToStrings(table);
     assertEquals(eventCount * perEventRowCount + " row(s) expected",

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
index 7fbfcef..e84e1f9 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
@@ -33,7 +33,9 @@ import com.google.common.collect.ImmutableList;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,30 +43,29 @@ import org.slf4j.LoggerFactory;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder;
 
-public class SecureKuduSinkTest extends BaseKuduTest {
+public class SecureKuduSinkTest {
   private static final Logger LOG = LoggerFactory.getLogger(SecureKuduSinkTest.class);
   private static final int TICKET_LIFETIME_SECONDS = 10;
   private static final int RENEWABLE_LIFETIME_SECONDS = 30;
 
+  private static final MiniKuduClusterBuilder clusterBuilder = KuduTestHarness.getBaseClusterBuilder()
+      .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s")
+      .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s")
+      .enableKerberos();
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
+
   @Before
   public void clearTicketCacheProperty() {
     // Let Flume authenticate.
     System.clearProperty(KUDU_TICKETCACHE_PROPERTY);
   }
 
-  @Override
-  protected MiniKuduClusterBuilder getMiniClusterBuilder() {
-    return super.getMiniClusterBuilder()
-      .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s")
-      .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s")
-      .enableKerberos();
-  }
-
   @Test
   public void testEventsWithShortTickets() throws Exception {
     LOG.info("Creating new table...");
@@ -74,11 +75,11 @@ public class SecureKuduSinkTest extends BaseKuduTest {
         new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
         .setNumReplicas(1);
     String tableName = "test_long_lived_events";
-    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+    KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
     LOG.info("Created new table.");
 
     KuduSink sink = KuduSinkTestUtil.createSecureSink(
-        tableName, getMasterAddressesAsString(), getClusterRoot());
+        tableName, harness.getMasterAddressesAsString(), harness.getClusterRoot());
     sink.start();
 
     LOG.info("Testing events at the beginning.");

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
index cd0e95a..ef175b5 100644
--- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.kudu.mapreduce;
 
+import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP;
 import static org.apache.kudu.util.ClientTestUtil.createFourTabletsTableWithNineRows;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -30,16 +31,19 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.kudu.Schema;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.util.ClientTestUtil;
 import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.KuduPredicate;
 import org.apache.kudu.client.RowResult;
 
-public class ITInputFormatJob extends BaseKuduTest {
+public class ITInputFormatJob {
   private static final Logger LOG = LoggerFactory.getLogger(ITInputFormatJob.class);
 
   private static final String TABLE_NAME =
@@ -47,9 +51,14 @@ public class ITInputFormatJob extends BaseKuduTest {
 
   private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
 
+  private static final Schema basicSchema = ClientTestUtil.getBasicSchema();
+
   /** Counter enumeration to count the actual rows. */
   private enum Counters { ROWS }
 
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
   @After
   public void tearDown() throws Exception {
     HADOOP_UTIL.cleanup();
@@ -59,7 +68,7 @@ public class ITInputFormatJob extends BaseKuduTest {
   @SuppressWarnings("deprecation")
   public void test() throws Exception {
 
-    createFourTabletsTableWithNineRows(client, TABLE_NAME, DEFAULT_SLEEP);
+    createFourTabletsTableWithNineRows(harness.getAsyncClient(), TABLE_NAME, DEFAULT_SLEEP);
 
     JobConf conf = new JobConf();
     HADOOP_UTIL.setupAndGetTestDir(ITInputFormatJob.class.getName(), conf).getAbsolutePath();
@@ -92,7 +101,7 @@ public class ITInputFormatJob extends BaseKuduTest {
             job,
             TABLE_NAME,
             "*",
-            getMasterAddressesAsString())
+            harness.getMasterAddressesAsString())
             .operationTimeoutMs(DEFAULT_SLEEP)
             .addDependencies(false)
             .cacheBlocks(false)

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
index 3266535..14a0c55 100644
--- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableInputFormat.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.kudu.mapreduce;
 
+import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP;
 import static org.apache.kudu.util.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.util.ClientTestUtil.getBasicSchema;
 import static org.junit.Assert.assertEquals;
@@ -32,27 +33,31 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.AsyncKuduSession;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduPredicate;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowResult;
 
-public class ITKuduTableInputFormat extends BaseKuduTest {
+public class ITKuduTableInputFormat {
 
   private static final String TABLE_NAME =
       ITKuduTableInputFormat.class.getName() + "-" + System.currentTimeMillis();
 
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
   @Test
   public void test() throws Exception {
-    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+    harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
 
-    KuduTable table = openTable(TABLE_NAME);
+    KuduTable table = harness.getClient().openTable(TABLE_NAME);
     Schema schema = getBasicSchema();
     Insert insert = table.newInsert();
     PartialRow row = insert.getRow();
@@ -61,7 +66,7 @@ public class ITKuduTableInputFormat extends BaseKuduTest {
     row.addInt(2, 3);
     row.addString(3, "a string");
     row.addBoolean(4, true);
-    AsyncKuduSession session = client.newSession();
+    AsyncKuduSession session = harness.getAsyncClient().newSession();
     session.apply(insert).join(DEFAULT_SLEEP);
     session.close().join(DEFAULT_SLEEP);
 
@@ -122,7 +127,7 @@ public class ITKuduTableInputFormat extends BaseKuduTest {
         List<KuduPredicate> predicates) throws IOException, InterruptedException {
     KuduTableInputFormat input = new KuduTableInputFormat();
     Configuration conf = new Configuration();
-    conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, getMasterAddressesAsString());
+    conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, harness.getMasterAddressesAsString());
     conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, TABLE_NAME);
     if (columnProjection != null) {
       conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
index 597461f..f8a2b27 100644
--- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITKuduTableOutputFormat.java
@@ -25,27 +25,31 @@ import static org.junit.Assert.assertNotNull;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.kudu.client.AsyncKuduScanner;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.PartialRow;
 
-public class ITKuduTableOutputFormat extends BaseKuduTest {
+public class ITKuduTableOutputFormat {
 
   private static final String TABLE_NAME =
       ITKuduTableOutputFormat.class.getName() + "-" + System.currentTimeMillis();
 
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
   @Test
   public void test() throws Exception {
-    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+    harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
 
     KuduTableOutputFormat output = new KuduTableOutputFormat();
     Configuration conf = new Configuration();
-    conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, getMasterAddressesAsString());
+    conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, harness.getMasterAddressesAsString());
     conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, TABLE_NAME);
     output.setConf(conf);
 
@@ -64,7 +68,7 @@ public class ITKuduTableOutputFormat extends BaseKuduTest {
     RecordWriter<NullWritable, Operation> rw = output.getRecordWriter(null);
     rw.write(NullWritable.get(), insert);
     rw.close(null);
-    AsyncKuduScanner.AsyncKuduScannerBuilder builder = client.newScannerBuilder(table);
+    AsyncKuduScanner.AsyncKuduScannerBuilder builder = harness.getAsyncClient().newScannerBuilder(table);
     assertEquals(1, countRowsInScan(builder.build()));
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
index 9c9918d..080f16a 100644
--- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITOutputFormatJob.java
@@ -17,6 +17,7 @@
 package org.apache.kudu.mapreduce;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP;
 import static org.apache.kudu.util.ClientTestUtil.countRowsInScan;
 import static org.apache.kudu.util.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.util.ClientTestUtil.getBasicSchema;
@@ -35,28 +36,31 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.kudu.test.KuduTestHarness;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.kudu.client.AsyncKuduScanner;
-import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.PartialRow;
 
-
-public class ITOutputFormatJob extends BaseKuduTest {
+public class ITOutputFormatJob {
 
   private static final String TABLE_NAME =
       ITOutputFormatJob.class.getName() + "-" + System.currentTimeMillis();
 
   private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
 
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
   @Before
   public void setUp() throws Exception {
-    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
+    harness.getClient().createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
   }
 
   @After
@@ -88,7 +92,7 @@ public class ITOutputFormatJob extends BaseKuduTest {
     new KuduTableMapReduceUtil.TableOutputFormatConfigurator(
         job,
         TABLE_NAME,
-        getMasterAddressesAsString())
+        harness.getMasterAddressesAsString())
         .operationTimeoutMs(DEFAULT_SLEEP)
         .addDependencies(false)
         .configure();
@@ -96,9 +100,9 @@ public class ITOutputFormatJob extends BaseKuduTest {
     assertTrue("Test job did not end properly", job.waitForCompletion(true));
 
     // Make sure the data's there
-    KuduTable table = openTable(TABLE_NAME);
+    KuduTable table = harness.getClient().openTable(TABLE_NAME);
     AsyncKuduScanner.AsyncKuduScannerBuilder builder =
-        client.newScannerBuilder(table);
+        harness.getAsyncClient().newScannerBuilder(table);
     assertEquals(2, countRowsInScan(builder.build()));
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dc8ae799/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 5b5fc1b..af6423b 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -129,8 +129,8 @@ trait KuduTestSuite extends JUnitSuite {
   @Before
   def setUpBase(): Unit = {
     miniCluster = new MiniKuduClusterBuilder()
-      .numMasters(1)
-      .numTservers(1)
+      .numMasterServers(1)
+      .numTabletServers(1)
       .build()
 
     ss = SparkSession.builder().config(conf).getOrCreate()


Mime
View raw message