kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: tool: new action for running mini-clusters
Date Fri, 06 Oct 2017 22:50:32 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 0e713aadb -> 761ce10b7


tool: new action for running mini-clusters

Maintaining Kudu clients across various languages has been an ongoing
maintenance burden. Even when the client is just a thin wrapper around
another client (e.g. Kudu Python bindings), a great deal of work goes into
client testability. In practice, this has meant a bespoke mini cluster
implementation for each language. On the surface this doesn't seem that bad;
we just need to spawn some masters and tservers, right? Well, the work
quickly adds up:

o While the C++ mini cluster is heavily used and has seen many improvements,
  the Java mini cluster has not received the same kind of love, and is less
  robust as a result. KUDU-1976 is a great example of this deficiency.
o With the inclusion of authn came the addition of a "mini KDC", a special
  daemon for Kerberized mini clusters. It was originally implemented in C++
  and ported to Java, but has yet to be ported to the Python client; this is
  one of the obstacles towards porting full authn support to Python.
o Dan has been prototyping Hive Metastore and Sentry integration for Kudu,
  the testing of which will require "mini HMS" and possibly "mini Sentry"
  testing implementations in C++, Java, and eventually, Python.

In sum, good support for non-C++ mini clusters is an ongoing commitment and
requires a great deal of work. This work hasn't always been forthcoming, and
the non-C++ clusters are deficient as a result. But it doesn't have to be
this way! Here's a thought: what if we reused the C++ mini cluster for tests
written in these other languages? We could write a "proxy" application whose
job it is to manage the C++ mini cluster and expose a rudimentary API that's
easily programmable from Java and Python.

This patch attempts to do just that. It adds a "mini_cluster" action to the
Kudu CLI which provides a rudimentary control shell that can be used to spin
up an ExternalMiniCluster. The shell is controlled via a wire protocol over
stdin/stdout. The protocol is protobuf-based with optional JSON encoding.

I should add that I like the idea of shipping "mini_cluster" into production
as part of the CLI, as it helps realize the vision of a single Kudu artifact
that can provide Kudu testability for any integrating product.

Change-Id: I0e693921ef780dc4a06e536c6b7408f7f0b252f6
Reviewed-on: http://gerrit.cloudera.org:8080/7853
Reviewed-by: Dan Burkert <danburkert@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/761ce10b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/761ce10b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/761ce10b

Branch: refs/heads/master
Commit: 761ce10b797c4f6345f0b84cfe30bcc92e346893
Parents: 0e713aa
Author: Adar Dembo <adar@cloudera.com>
Authored: Sun Aug 27 22:44:34 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Fri Oct 6 22:50:18 2017 +0000

----------------------------------------------------------------------
 src/kudu/mini-cluster/external_mini_cluster.h |  16 +-
 src/kudu/security/test/mini_kdc.cc            |   4 +-
 src/kudu/tools/CMakeLists.txt                 |  56 ++-
 src/kudu/tools/kudu-tool-test.cc              | 326 +++++++++++++++++
 src/kudu/tools/tool.proto                     | 173 +++++++++
 src/kudu/tools/tool_action.h                  |   1 +
 src/kudu/tools/tool_action_common.cc          | 188 +++++++++-
 src/kudu/tools/tool_action_common.h           |  66 ++++
 src/kudu/tools/tool_action_test.cc            | 392 +++++++++++++++++++++
 src/kudu/tools/tool_main.cc                   |   1 +
 10 files changed, 1209 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/mini-cluster/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index 05c8106..29c209e 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -228,7 +228,11 @@ class ExternalMiniCluster : public MiniCluster {
   std::vector<ExternalDaemon*> daemons() const;
 
   MiniKdc* kdc() const {
-    return CHECK_NOTNULL(kdc_.get());
+    return kdc_.get();
+  }
+
+  const std::string& data_root() const {
+    return opts_.data_root;
   }
 
   int num_tablet_servers() const override {
@@ -424,6 +428,8 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
   // crash from SIGABRT.
   Status WaitForFatal(const MonoDelta& timeout) const;
 
+  virtual Status Start() = 0;
+  virtual Status Restart() = 0;
   virtual void Shutdown();
 
   // Delete files specified by 'wal_dir_' and 'data_dirs_'.
@@ -539,11 +545,11 @@ class ExternalMaster : public ExternalDaemon {
  public:
   explicit ExternalMaster(ExternalDaemonOptions opts);
 
-  Status Start();
+  virtual Status Start() override;
 
   // Restarts the daemon.
   // Requires that it has previously been shutdown.
-  Status Restart() WARN_UNUSED_RESULT;
+  virtual Status Restart() override WARN_UNUSED_RESULT;
 
   // Blocks until the master's catalog manager is initialized and responding to
   // RPCs.
@@ -561,11 +567,11 @@ class ExternalTabletServer : public ExternalDaemon {
   ExternalTabletServer(ExternalDaemonOptions opts,
                        std::vector<HostPort> master_addrs);
 
-  Status Start();
+  virtual Status Start() override;
 
   // Restarts the daemon.
   // Requires that it has previously been shutdown.
-  Status Restart() WARN_UNUSED_RESULT;
+  virtual Status Restart() override WARN_UNUSED_RESULT;
 
  private:
   const std::vector<HostPort> master_addrs_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/security/test/mini_kdc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/test/mini_kdc.cc b/src/kudu/security/test/mini_kdc.cc
index 95213f5..7830a2b 100644
--- a/src/kudu/security/test/mini_kdc.cc
+++ b/src/kudu/security/test/mini_kdc.cc
@@ -162,7 +162,9 @@ Status MiniKdc::Start() {
 }
 
 Status MiniKdc::Stop() {
-  CHECK(kdc_process_);
+  if (!kdc_process_) {
+    return Status::OK();
+  }
   VLOG(1) << "Stopping KDC";
   unique_ptr<Subprocess> proc(kdc_process_.release());
   RETURN_NOT_OK(proc->Kill(SIGKILL));

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index c7ebb41..bcfc9c2 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -16,19 +16,41 @@
 # under the License.
 
 set(LINK_LIBS
-  kudu_client
-  log
+  cfile
   consensus
-  tserver
+  gutil
+  kudu_client
   kudu_common
   kudu_fs
   kudu_util
-  gutil
-  cfile
+  log
   tablet
+  tserver
   ${KUDU_BASE_LIBS}
 )
 
+#######################################
+# tool_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+  TOOL_PROTO_SRCS TOOL_PROTO_HDRS TOOL_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES tool.proto)
+
+add_library(tool_proto
+  ${TOOL_PROTO_SRCS}
+  ${TOOL_PROTO_HDRS})
+target_link_libraries(tool_proto
+  kudu_common_proto
+  protobuf
+  wire_protocol_proto)
+
+#######################################
+# kudu_tools_util
+#######################################
+
 add_library(kudu_tools_util
   color.cc
   data_gen_util.cc
@@ -36,12 +58,21 @@ add_library(kudu_tools_util
   tool_action_common.cc
 )
 target_link_libraries(kudu_tools_util
+  tool_proto
   ${LINK_LIBS})
 
+#######################################
+# create-demo-table
+#######################################
+
 add_executable(create-demo-table create-demo-table.cc)
 target_link_libraries(create-demo-table
   ${LINK_LIBS})
 
+#######################################
+# ksck
+#######################################
+
 add_library(ksck
     ksck.cc
     ksck_remote.cc
@@ -56,6 +87,10 @@ target_link_libraries(ksck
   ${KUDU_BASE_LIBS}
 )
 
+#######################################
+# kudu
+#######################################
+
 add_executable(kudu
   tool_action_cluster.cc
   tool_action_fs.cc
@@ -66,6 +101,7 @@ add_executable(kudu
   tool_action_remote_replica.cc
   tool_action_table.cc
   tool_action_tablet.cc
+  tool_action_test.cc
   tool_action_tserver.cc
   tool_action_wal.cc
   tool_main.cc
@@ -83,11 +119,17 @@ target_link_libraries(kudu
   kudu_util
   log
   master
+  mini_cluster
   tablet
+  tool_proto
   tserver
   ${KUDU_BASE_LIBS}
 )
 
+#######################################
+# kudu_tools_test_util
+#######################################
+
 add_library(kudu_tools_test_util
   tool_test_util.cc
 )
@@ -95,6 +137,10 @@ target_link_libraries(kudu_tools_test_util
   kudu_util
 )
 
+#######################################
+# Unit tests
+#######################################
+
 set(KUDU_TEST_LINK_LIBS
   ksck
   kudu_tools_util

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 1779cef..7590d32 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -25,6 +25,7 @@
 #include <memory>
 #include <sstream>
 #include <string>
+#include <tuple>  // IWYU pragma: keep
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -85,6 +86,8 @@
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tools/tool.pb.h"
+#include "kudu/tools/tool_action_common.h"
 #include "kudu/tools/tool_test_util.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
@@ -381,6 +384,7 @@ TEST_F(ToolTest, TestHelpXML) {
       "remote_replica",
       "table",
       "tablet",
+      "test",
       "tserver",
       "wal",
       "dump",
@@ -404,6 +408,7 @@ TEST_F(ToolTest, TestTopLevelHelp) {
       "remote_replica.*tablet replicas on a Kudu Tablet Server",
       "table.*Kudu tables",
       "tablet.*Kudu tablets",
+      "test.*test actions",
       "tserver.*Kudu Tablet Server",
       "wal.*write-ahead log"
   };
@@ -526,6 +531,12 @@ TEST_F(ToolTest, TestModeHelp) {
     NO_FATALS(RunTestHelp("tablet", kTabletModeRegexes));
   }
   {
+    const vector<string> kTestModeRegexes = {
+        "mini_cluster.*Spawn a control shell"
+    };
+    NO_FATALS(RunTestHelp("test", kTestModeRegexes));
+  }
+  {
     const vector<string> kChangeConfigModeRegexes = {
         "add_replica.*Add a new replica",
         "change_replica_type.*Change the type of an existing replica",
@@ -1741,5 +1752,320 @@ TEST_F(ToolTest, TestMasterList) {
   ASSERT_STR_CONTAINS(out, master->bound_rpc_hostport().ToString());
 }
 
+// This test is parameterized on the serialization mode and Kerberos.
+class ControlShellToolTest :
+    public ToolTest,
+    public ::testing::WithParamInterface<std::tuple<ControlShellProtocol::SerializationMode,
+                                                    bool>> {
+ public:
+  virtual void SetUp() override {
+    ToolTest::SetUp();
+
+    // Start the control shell.
+    string mode;
+    switch (serde_mode()) {
+      case ControlShellProtocol::SerializationMode::JSON: mode = "json"; break;
+      case ControlShellProtocol::SerializationMode::PB: mode = "pb"; break;
+      default: LOG(FATAL) << "Unknown serialization mode";
+    }
+    shell_.reset(new Subprocess({
+      tool_path_,
+      "test",
+      "mini_cluster",
+      Substitute("--serialization=$0", mode)
+    }));
+    shell_->ShareParentStdin(false);
+    shell_->ShareParentStdout(false);
+    ASSERT_OK(shell_->Start());
+
+    // Start the protocol interface.
+    proto_.reset(new ControlShellProtocol(serde_mode(),
+                                          ControlShellProtocol::CloseMode::CLOSE_ON_DESTROY,
+                                          shell_->ReleaseChildStdoutFd(),
+                                          shell_->ReleaseChildStdinFd()));
+  }
+
+  virtual void TearDown() override {
+    if (proto_) {
+      // Stopping the protocol interface will close the fds, causing the shell
+      // to exit on its own.
+      proto_.reset();
+      ASSERT_OK(shell_->Wait());
+      int exit_status;
+      ASSERT_OK(shell_->GetExitStatus(&exit_status));
+      ASSERT_EQ(0, exit_status);
+    }
+    ToolTest::TearDown();
+  }
+
+ protected:
+  // Send a control message to the shell and receive its response.
+  Status SendReceive(const ControlShellRequestPB& req, ControlShellResponsePB* resp) {
+    RETURN_NOT_OK(proto_->SendMessage(req));
+    RETURN_NOT_OK(proto_->ReceiveMessage(resp));
+    if (resp->has_error()) {
+      return StatusFromPB(resp->error());
+    }
+    return Status::OK();
+  }
+
+  ControlShellProtocol::SerializationMode serde_mode() const {
+    return ::testing::get<0>(GetParam());
+  }
+
+  bool enable_kerberos() const {
+    return ::testing::get<1>(GetParam());
+  }
+
+  unique_ptr<Subprocess> shell_;
+  unique_ptr<ControlShellProtocol> proto_;
+};
+
+INSTANTIATE_TEST_CASE_P(SerializationModes, ControlShellToolTest,
+                        ::testing::Combine(::testing::Values(
+                            ControlShellProtocol::SerializationMode::PB,
+                            ControlShellProtocol::SerializationMode::JSON),
+                                           ::testing::Bool()));
+
+TEST_P(ControlShellToolTest, TestControlShell) {
+  const int kNumMasters = 1;
+  const int kNumTservers = 3;
+
+  // Create an illegal cluster first, to make sure that the shell continues to
+  // work in the event of an error.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_create_cluster()->set_num_masters(4);
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    Status s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_CONTAINS(s.ToString(), "only one or three masters are supported");
+  }
+
+  // Create a cluster.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_create_cluster()->set_data_root(JoinPathSegments(
+        test_dir_, "minicluster-data"));
+    req.mutable_create_cluster()->set_num_masters(kNumMasters);
+    req.mutable_create_cluster()->set_num_tservers(kNumTservers);
+    req.mutable_create_cluster()->set_enable_kerberos(enable_kerberos());
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Try creating a second cluster. It should fail.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_create_cluster()->set_data_root(JoinPathSegments(
+        test_dir_, "minicluster-data"));
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    Status s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_CONTAINS(s.ToString(), "cluster already created");
+  }
+
+  // Start it.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_start_cluster();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Get the masters.
+  vector<DaemonInfoPB> masters;
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_get_masters();
+    ASSERT_OK(SendReceive(req, &resp));
+    ASSERT_TRUE(resp.has_get_masters());
+    ASSERT_EQ(kNumMasters, resp.get_masters().masters_size());
+    masters.assign(resp.get_masters().masters().begin(),
+                   resp.get_masters().masters().end());
+  }
+
+  if (enable_kerberos()) {
+    // Set up the KDC environment variables so that a client can authenticate
+    // to this cluster.
+    //
+    // Normally this is handled automatically by the cluster's MiniKdc, but
+    // since the cluster is running in a subprocess, we have to do it manually.
+    unordered_map<string, string> kdc_env_vars;
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_get_kdc_env_vars();
+    ASSERT_OK(SendReceive(req, &resp));
+    ASSERT_TRUE(resp.has_get_kdc_env_vars());
+    for (const auto& e : resp.get_kdc_env_vars().env_vars()) {
+      PCHECK(setenv(e.first.c_str(), e.second.c_str(), 1) == 0);
+    }
+  } else {
+    // get_kdc_env_vars should fail on a non-Kerberized cluster.
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_get_kdc_env_vars();
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    Status s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsNotFound());
+    ASSERT_STR_CONTAINS(s.ToString(), "kdc not found");
+  }
+
+  // Create a table.
+  {
+    client::KuduClientBuilder client_builder;
+    for (const auto& e : masters) {
+      HostPort hp;
+      ASSERT_OK(HostPortFromPB(e.bound_rpc_address(), &hp));
+      client_builder.add_master_server_addr(hp.ToString());
+    }
+    shared_ptr<client::KuduClient> client;
+    ASSERT_OK(client_builder.Build(&client));
+    client::KuduSchemaBuilder schema_builder;
+    schema_builder.AddColumn("foo")
+              ->Type(client::KuduColumnSchema::INT32)
+              ->NotNull()
+              ->PrimaryKey();
+    client::KuduSchema schema;
+    ASSERT_OK(schema_builder.Build(&schema));
+    unique_ptr<client::KuduTableCreator> table_creator(
+        client->NewTableCreator());
+    ASSERT_OK(table_creator->table_name("test")
+              .schema(&schema)
+              .set_range_partition_columns({ "foo" })
+              .Create());
+  }
+
+  // Get the tservers.
+  vector<DaemonInfoPB> tservers;
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_get_tservers();
+    ASSERT_OK(SendReceive(req, &resp));
+    ASSERT_TRUE(resp.has_get_tservers());
+    ASSERT_EQ(kNumTservers, resp.get_tservers().tservers_size());
+    tservers.assign(resp.get_tservers().tservers().begin(),
+                    resp.get_tservers().tservers().end());
+  }
+
+  // Stop a tserver.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_stop_daemon()->mutable_id() = tservers[0].id();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Restart it.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_start_daemon()->mutable_id() = tservers[0].id();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Stop a master.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_stop_daemon()->mutable_id() = masters[0].id();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Restart it.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_start_daemon()->mutable_id() = masters[0].id();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Restart some non-existent daemons.
+  vector<DaemonIdentifierPB> daemons_to_restart;
+  {
+    // Unknown daemon type.
+    DaemonIdentifierPB id;
+    id.set_type(UNKNOWN_DAEMON);
+    daemons_to_restart.emplace_back(std::move(id));
+  }
+  {
+    // Tablet server #5.
+    DaemonIdentifierPB id;
+    id.set_type(TSERVER);
+    id.set_index(5);
+    daemons_to_restart.emplace_back(std::move(id));
+  }
+  {
+    // Master without an index.
+    DaemonIdentifierPB id;
+    id.set_type(MASTER);
+    daemons_to_restart.emplace_back(std::move(id));
+  }
+  if (!enable_kerberos()) {
+    // KDC for a non-Kerberized cluster.
+    DaemonIdentifierPB id;
+    id.set_type(KDC);
+    daemons_to_restart.emplace_back(std::move(id));
+  }
+  for (const auto& daemon : daemons_to_restart) {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_start_daemon()->mutable_id() = daemon;
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+  }
+
+  // Stop the cluster.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_stop_cluster();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Restart it.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_start_cluster();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  if (enable_kerberos()) {
+    // Restart the KDC.
+    {
+      ControlShellRequestPB req;
+      ControlShellResponsePB resp;
+      req.mutable_stop_daemon()->mutable_id()->set_type(KDC);
+      ASSERT_OK(SendReceive(req, &resp));
+    }
+    {
+      ControlShellRequestPB req;
+      ControlShellResponsePB resp;
+      req.mutable_start_daemon()->mutable_id()->set_type(KDC);
+      ASSERT_OK(SendReceive(req, &resp));
+    }
+  }
+
+  // Destroy the cluster.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    req.mutable_destroy_cluster();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+}
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
new file mode 100644
index 0000000..ff0b0db
--- /dev/null
+++ b/src/kudu/tools/tool.proto
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu.tools;
+
+option java_package = "org.apache.kudu.tools";
+
+import "kudu/common/common.proto";
+import "kudu/common/wire_protocol.proto";
+
+// Creates a new ExternalMiniCluster.
+//
+// The below fields generally map to options from ExternalMiniClusterOptions.
+// If not provided, the defaults from that class will be used instead.
+//
+// Only one cluster may be created at a time.
+message CreateClusterRequestPB {
+  // The desired number of masters.
+  //
+  // Currently only one or three masters are supported.
+  optional int32 num_masters = 1;
+
+  // The desired number of tablet servers.
+  optional int32 num_tservers = 2;
+
+  // Whether or not the cluster should be Kerberized.
+  optional bool enable_kerberos = 3;
+
+  // The directory where the cluster's data and logs should be placed.
+  optional string data_root = 4;
+
+  // Any additional gflags for masters or tablet servers. Each should be in a
+  // a format that's expected by gflag (i.e. "--foo=bar").
+  repeated string extra_master_flags = 5;
+  repeated string extra_tserver_flags = 6;
+}
+
+// Destroys a cluster created via 'create_cluster'.
+message DestroyClusterRequestPB {}
+
+// Starts all daemons in a newly created cluster, or restart all daemons
+// in a stopped cluster.
+//
+// No-op for already started clusters.
+message StartClusterRequestPB {}
+
+// Stops a cluster.
+//
+// No-op for already stopped clusters.
+message StopClusterRequestPB {}
+
+// Type of daemon managed in a cluster.
+enum DaemonType {
+  UNKNOWN_DAEMON = 0;
+  MASTER = 1;
+  TSERVER = 2;
+  KDC = 3;
+}
+
+// Identifier for a cluster daemon, unique to the cluster.
+message DaemonIdentifierPB {
+  // Whether the daemon is amaster, tserver, or whatever.
+  optional DaemonType type = 1;
+
+  // Index of the daemon in the cluster, if the cluster has multiple daemons
+  // of this type.
+  optional uint32 index = 2;
+}
+
+// Restart a stopped daemon.
+message StartDaemonRequestPB {
+  // The identifier of the daemon to be restarted. This identifier is unique
+  // and immutable for the lifetime of the cluster.
+  optional DaemonIdentifierPB id = 1;
+}
+
+// Stops a started daemon.
+//
+// No-op for already stopped daemons.
+message StopDaemonRequestPB {
+  // The identifier for the daemon to be stopped. This identifier is unique
+  // and immutable for the lifetime of the cluster.
+  optional DaemonIdentifierPB id = 1;
+}
+
+// Daemon information.
+message DaemonInfoPB {
+  // Unique identifier of the daemon.
+  optional DaemonIdentifierPB id = 1;
+
+  // Daemon's bound RPC address.
+  optional HostPortPB bound_rpc_address = 2;
+}
+
+// Response to a GetMastersRequestPB.
+message GetMastersResponsePB {
+  // List of masters.
+  repeated DaemonInfoPB masters = 1;
+}
+
+// Gets information on each started master.
+message GetMastersRequestPB {}
+
+// Response to a GetTServersRequestPB.
+message GetTServersResponsePB {
+  // List of tablet servers.
+  repeated DaemonInfoPB tservers = 1;
+}
+
+// Gets information on each started tablet server.
+message GetTServersRequestPB {}
+
+// Response to a GetKDCEnvVarsRequestPB.
+message GetKDCEnvVarsResponsePB {
+
+  // Environment variables, mapped from key to value.
+  map<string, string> env_vars = 1;
+}
+
+// Gets all environment variables another process may need in order to
+// communicate with this cluster's KDC.
+//
+// It is an error to call this on a non-Kerberized cluster.
+message GetKDCEnvVarsRequestPB {}
+
+// Sent by the control shell in response to a control shell command request.
+message ControlShellResponsePB {
+
+  // Only set if there was some kind of shell-side error.
+  optional AppStatusPB error = 1;
+
+  // The command response. Only set for commands that actually expect a response.
+  oneof response {
+    GetMastersResponsePB get_masters = 2;
+    GetTServersResponsePB get_tservers = 3;
+    GetKDCEnvVarsResponsePB get_kdc_env_vars = 4;
+  }
+}
+
+// Command sent to the control shell.
+//
+// Because the control shell communicates via pipe and not krpc, we can't make
+// use of service dispatch and must instead multiplex all command requests and
+// responses via ControlShellRequestPB and ControlShellResponsePB respectively.
+message ControlShellRequestPB {
+
+  // The command request.
+  oneof request {
+    CreateClusterRequestPB create_cluster = 1;
+    DestroyClusterRequestPB destroy_cluster = 2;
+    StartClusterRequestPB start_cluster = 3;
+    StopClusterRequestPB stop_cluster = 4;
+    StartDaemonRequestPB start_daemon = 5;
+    StopDaemonRequestPB stop_daemon = 6;
+    GetMastersRequestPB get_masters = 7;
+    GetTServersRequestPB get_tservers = 8;
+    GetKDCEnvVarsRequestPB get_kdc_env_vars = 9;
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_action.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action.h b/src/kudu/tools/tool_action.h
index cdb6d79..8a86e36 100644
--- a/src/kudu/tools/tool_action.h
+++ b/src/kudu/tools/tool_action.h
@@ -316,6 +316,7 @@ std::unique_ptr<Mode> BuildPerfMode();
 std::unique_ptr<Mode> BuildRemoteReplicaMode();
 std::unique_ptr<Mode> BuildTableMode();
 std::unique_ptr<Mode> BuildTabletMode();
+std::unique_ptr<Mode> BuildTestMode();
 std::unique_ptr<Mode> BuildTServerMode();
 std::unique_ptr<Mode> BuildWalMode();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_action_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index fea476d..35471ee 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -17,7 +17,10 @@
 
 #include "kudu/tools/tool_action_common.h"
 
+#include <unistd.h>
+
 #include <algorithm>
+#include <cerrno>
 #include <cstddef>
 #include <iomanip>
 #include <iostream>
@@ -30,10 +33,10 @@
 #include <boost/algorithm/string/predicate.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <google/protobuf/util/json_util.h>
 
-#include "kudu/client/client.h"
 #include "kudu/client/client-internal.h"  // IWYU pragma: keep
-#include "kudu/gutil/map-util.h"
+#include "kudu/client/client.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row_operations.h"
@@ -44,19 +47,24 @@
 #include "kudu/consensus/log.pb.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/opid.pb.h"
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.proxy.h" // IWYU pragma: keep
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/server/server_base.pb.h"
 #include "kudu/server/server_base.proxy.h"
+#include "kudu/tools/tool.pb.h" // IWYU pragma: keep
 #include "kudu/tools/tool_action.h"
 #include "kudu/tserver/tserver.pb.h"
-#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
 #include "kudu/tserver/tserver_admin.proxy.h"   // IWYU pragma: keep
+#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
+#include "kudu/util/faststring.h"
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
@@ -136,6 +144,7 @@ using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 using tserver::TabletServerAdminServiceProxy;
 using tserver::TabletServerServiceProxy;
 using tserver::WriteRequestPB;
@@ -578,5 +587,178 @@ Status LeaderMasterProxy::SyncRpc(const ListMastersRequestPB& req,
                                                                ListMastersResponsePB*,
                                                                RpcController*)>& func);
 
+const int ControlShellProtocol::kMaxMessageBytes = 1024 * 1024;
+
+ControlShellProtocol::ControlShellProtocol(SerializationMode serialization_mode,
+                                           CloseMode close_mode,
+                                           int read_fd,
+                                           int write_fd)
+    : serialization_mode_(serialization_mode),
+      close_mode_(close_mode),
+      read_fd_(read_fd),
+      write_fd_(write_fd) {
+}
+
+ControlShellProtocol::~ControlShellProtocol() {
+  if (close_mode_ == CloseMode::CLOSE_ON_DESTROY) {
+    close(read_fd_);
+    close(write_fd_);
+  }
+}
+
+template <class M>
+Status ControlShellProtocol::ReceiveMessage(M* message) {
+  switch (serialization_mode_) {
+    case SerializationMode::JSON:
+    {
+      // Read and accumulate one byte at a time, looking for the newline.
+      //
+      // TODO(adar): it would be more efficient to read a chunk of data, look
+      // for a newline, and if found, store the remainder for the next message.
+      faststring buf;
+      faststring one_byte;
+      one_byte.resize(1);
+      while (true) {
+        RETURN_NOT_OK_PREPEND(DoRead(&one_byte), "unable to receive message byte");
+        if (one_byte[0] == '\n') {
+          break;
+        }
+        buf.push_back(one_byte[0]);
+      }
+
+      // Parse the JSON-encoded message.
+      const auto& google_status =
+          google::protobuf::util::JsonStringToMessage(buf.ToString(), message);
+      if (!google_status.ok()) {
+        return Status::InvalidArgument(
+            Substitute("unable to parse JSON: $0", buf.ToString()),
+            google_status.error_message().ToString());
+      }
+      break;
+    }
+    case SerializationMode::PB:
+    {
+      // Read four bytes of size (big-endian).
+      faststring size_buf;
+      size_buf.resize(sizeof(uint32_t));
+      RETURN_NOT_OK_PREPEND(DoRead(&size_buf), "unable to receive message size");
+      uint32_t body_size = NetworkByteOrder::Load32(size_buf.data());
+
+      if (body_size > kMaxMessageBytes) {
+        return Status::IOError(
+            Substitute("message size ($0) exceeds maximum message size ($1)",
+                       body_size, kMaxMessageBytes));
+      }
+
+      // Read the variable size body.
+      faststring body_buf;
+      body_buf.resize(body_size);
+      RETURN_NOT_OK_PREPEND(DoRead(&body_buf), "unable to receive message body");
+
+      // Parse the body into a PB request.
+      RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(
+          message, body_buf.data(), body_buf.length()),
+                            Substitute("unable to parse PB: $0", body_buf.ToString()));
+      break;
+    }
+    default: LOG(FATAL) << "Unknown mode";
+  }
+
+  VLOG(1) << "Received message: " << pb_util::SecureDebugString(*message);
+  return Status::OK();
+}
+
+template <class M>
+Status ControlShellProtocol::SendMessage(const M& message) {
+  VLOG(1) << "Sending message: " << pb_util::SecureDebugString(message);
+
+  faststring buf;
+  switch (serialization_mode_) {
+    case SerializationMode::JSON:
+    {
+      string serialized;
+      const auto& google_status =
+          google::protobuf::util::MessageToJsonString(message, &serialized);
+      if (!google_status.ok()) {
+        return Status::InvalidArgument(Substitute(
+            "unable to serialize JSON: $0", pb_util::SecureDebugString(message)),
+                                       google_status.error_message().ToString());
+      }
+
+      buf.append(serialized);
+      buf.append("\n");
+      break;
+    }
+    case SerializationMode::PB:
+    {
+      size_t msg_size = message.ByteSizeLong();
+      buf.resize(sizeof(uint32_t) + msg_size);
+      NetworkByteOrder::Store32(buf.data(), msg_size);
+      if (!message.SerializeWithCachedSizesToArray(buf.data() + sizeof(uint32_t))) {
+        return Status::Corruption("failed to serialize PB to array");
+      }
+      break;
+    }
+    default:
+      break;
+  }
+  RETURN_NOT_OK_PREPEND(DoWrite(buf), "unable to send message");
+  return Status::OK();
+}
+
+Status ControlShellProtocol::DoRead(faststring* buf) {
+  uint8_t* pos = buf->data();
+  size_t rem = buf->length();
+  while (rem > 0) {
+    ssize_t r = read(read_fd_, pos, rem);
+    if (r == -1) {
+      if (errno == EINTR) {
+        // Interrupted by a signal, retry.
+        continue;
+      }
+      return Status::IOError("Error reading from pipe", "", errno);
+    }
+    if (r == 0) {
+      return Status::EndOfFile("Other end of pipe was closed");
+    }
+    DCHECK_GE(rem, r);
+    rem -= r;
+    pos += r;
+  }
+  return Status::OK();
+}
+
+Status ControlShellProtocol::DoWrite(const faststring& buf) {
+  const uint8_t* pos = buf.data();
+  size_t rem = buf.length();
+  while (rem > 0) {
+    ssize_t r = write(write_fd_, pos, rem);
+    if (r == -1) {
+      if (errno == EINTR) {
+        // Interrupted by a signal, retry.
+        continue;
+      }
+      if (errno == EPIPE) {
+        return Status::EndOfFile("Other end of pipe was closed");
+      }
+      return Status::IOError("Error writing to pipe", "", errno);
+    }
+    DCHECK_GE(rem, r);
+    rem -= r;
+    pos += r;
+  }
+  return Status::OK();
+}
+
+// Explicit specialization for callers outside this compilation unit.
+template
+Status ControlShellProtocol::ReceiveMessage(ControlShellRequestPB* message);
+template
+Status ControlShellProtocol::ReceiveMessage(ControlShellResponsePB* message);
+template
+Status ControlShellProtocol::SendMessage(const ControlShellRequestPB& message);
+template
+Status ControlShellProtocol::SendMessage(const ControlShellResponsePB& message);
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_action_common.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.h b/src/kudu/tools/tool_action_common.h
index d9698ed..ea08643 100644
--- a/src/kudu/tools/tool_action_common.h
+++ b/src/kudu/tools/tool_action_common.h
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/status.h"
@@ -35,6 +36,8 @@ class function;
 
 namespace kudu {
 
+class faststring;
+
 namespace client {
 class KuduClient;
 } // namespace client
@@ -167,5 +170,68 @@ class LeaderMasterProxy {
   client::sp::shared_ptr<client::KuduClient> client_;
 };
 
+// Facilitates sending and receiving messages with the tool control shell.
+//
+// May be used by a subprocess communicating with the shell via pipe, or by the
+// shell itself to read/write messages via stdin/stdout respectively.
+class ControlShellProtocol {
+ public:
+  enum class SerializationMode {
+    // Each message is serialized as a four byte little-endian size followed by
+    // the protobuf-encoded message itself.
+    PB,
+
+    // Each message is serialized into a protobuf-like JSON representation
+    // terminated with a newline character.
+    JSON,
+  };
+
+  // Whether the provided fds are closed at class destruction time.
+  enum class CloseMode {
+    CLOSE_ON_DESTROY,
+    NO_CLOSE_ON_DESTROY,
+  };
+
+  // Constructs a new protocol instance.
+  //
+  // If 'close_mode' is CLOSE_ON_DESTROY, the instance has effectively taken
+  // control of 'read_fd' and 'write_fd' and the caller shouldn't use them.
+  ControlShellProtocol(SerializationMode serialization_mode,
+                       CloseMode close_mode,
+                       int read_fd,
+                       int write_fd);
+
+  ~ControlShellProtocol();
+
+  // Receives a protobuf message, blocking if the pipe is empty.
+  //
+  // Returns EndOfFile if the writer on the other end of the pipe was closed.
+  //
+  // Returns an error if serialization_mode_ is PB and the received message
+  // sizes exceeds kMaxMessageBytes.
+  template <class M>
+  Status ReceiveMessage(M* message);
+
+  // Sends a protobuf message, blocking if the pipe is full.
+  //
+  // Returns EndOfFile if the reader on the other end of the pipe was closed.
+  template <class M>
+  Status SendMessage(const M& message);
+
+ private:
+  // Private helpers to drive actual pipe reading and writing.
+  Status DoRead(faststring* buf);
+  Status DoWrite(const faststring& buf);
+
+  static const int kMaxMessageBytes;
+
+  const SerializationMode serialization_mode_;
+  const CloseMode close_mode_;
+  const int read_fd_;
+  const int write_fd_;
+
+  DISALLOW_COPY_AND_ASSIGN(ControlShellProtocol);
+};
+
 } // namespace tools
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_action_test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc
new file mode 100644
index 0000000..0373577
--- /dev/null
+++ b/src/kudu/tools/tool_action_test.cc
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/tool_action.h"
+
+#include <unistd.h>
+
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/status.h>
+#include <google/protobuf/stubs/stringpiece.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/tools/tool.pb.h"
+#include "kudu/tools/tool_action_common.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+DEFINE_string(serialization, "json", "Serialization method to be used by the "
+              "control shell. Valid values are 'json' (protobuf serialized "
+              "into JSON and terminated with a newline character) or 'pb' "
+              "(four byte protobuf message length in little endian followed by "
+              "the protobuf message itself).");
+DEFINE_validator(serialization, [](const char* /*n*/, const std::string& v) {
+  return boost::iequals(v, "pb") ||
+         boost::iequals(v, "json");
+});
+
+namespace kudu {
+
+namespace tools {
+
+using cluster::ExternalDaemon;
+using cluster::ExternalMiniCluster;
+using cluster::ExternalMiniClusterOptions;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace {
+
+Status MakeDataRoot(string* data_root) {
+  // The ExternalMiniCluster can't generate the data root on our behalf because
+  // we're not running inside a gtest. So we'll use this approach instead,
+  // which is what the Java external mini cluster used for a long time.
+  const char* tmpdir = getenv("TEST_TMPDIR");
+  string tmpdir_str = tmpdir ? tmpdir : Substitute("/tmp/kudutest-$0", getuid());
+  string root = JoinPathSegments(tmpdir_str, "minicluster-data");
+  RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), root));
+
+  *data_root = root;
+  return Status::OK();
+}
+
+Status CheckClusterExists(const unique_ptr<ExternalMiniCluster>& cluster) {
+  if (!cluster) {
+    return Status::NotFound("cluster not found");
+  }
+  return Status::OK();
+}
+
+Status FindDaemon(const unique_ptr<ExternalMiniCluster>& cluster,
+                  const DaemonIdentifierPB& id,
+                  ExternalDaemon** daemon,
+                  MiniKdc** kdc) {
+  RETURN_NOT_OK(CheckClusterExists(cluster));
+
+  if (!id.has_type()) {
+    return Status::InvalidArgument("request is missing daemon type");
+  }
+
+  switch (id.type()) {
+    case MASTER:
+      if (!id.has_index()) {
+        return Status::InvalidArgument("request is missing daemon index");
+      }
+      if (id.index() >= cluster->num_masters()) {
+        return Status::NotFound(Substitute("no master with index $0",
+                                           id.index()));
+      }
+      *daemon = cluster->master(id.index());
+      *kdc = nullptr;
+      break;
+    case TSERVER:
+      if (!id.has_index()) {
+        return Status::InvalidArgument("request is missing daemon index");
+      }
+      if (id.index() >= cluster->num_tablet_servers()) {
+        return Status::NotFound(Substitute("no tserver with index $0",
+                                           id.index()));
+      }
+      *daemon = cluster->tablet_server(id.index());
+      *kdc = nullptr;
+      break;
+    case KDC:
+      if (!cluster->kdc()) {
+        return Status::NotFound("kdc not found");
+      }
+      *daemon = nullptr;
+      *kdc = cluster->kdc();
+      break;
+    default:
+      return Status::InvalidArgument(
+          Substitute("unknown daemon type: $0", DaemonType_Name(id.type())));
+  }
+  return Status::OK();
+}
+
+Status ProcessRequest(const ControlShellRequestPB& req,
+                      ControlShellResponsePB* resp,
+                      unique_ptr<ExternalMiniCluster>* cluster) {
+  switch (req.request_case()) {
+    case ControlShellRequestPB::kCreateCluster:
+    {
+      if (*cluster) {
+        RETURN_NOT_OK(Status::InvalidArgument("cluster already created"));
+      }
+      const CreateClusterRequestPB& cc = req.create_cluster();
+      ExternalMiniClusterOptions opts;
+      if (cc.has_num_masters()) {
+        if (cc.num_masters() != 1 && cc.num_masters() != 3) {
+          RETURN_NOT_OK(Status::InvalidArgument(
+              "only one or three masters are supported"));
+        }
+        opts.num_masters = cc.num_masters();
+      }
+      if (cc.has_num_tservers()) {
+        opts.num_tablet_servers = cc.num_tservers();
+      }
+      opts.enable_kerberos = cc.enable_kerberos();
+      if (cc.has_data_root()) {
+        opts.data_root = cc.data_root();
+      } else {
+        RETURN_NOT_OK(MakeDataRoot(&opts.data_root));
+      }
+      opts.extra_master_flags.assign(cc.extra_master_flags().begin(),
+                                     cc.extra_master_flags().end());
+      opts.extra_tserver_flags.assign(cc.extra_tserver_flags().begin(),
+                                      cc.extra_tserver_flags().end());
+      if (opts.num_masters > 1) {
+        opts.master_rpc_ports = { 11030, 11031, 11032 };
+      }
+      if (opts.enable_kerberos) {
+        opts.mini_kdc_options.data_root = JoinPathSegments(opts.data_root, "krb5kdc");
+      }
+
+      cluster->reset(new ExternalMiniCluster(std::move(opts)));
+      break;
+    }
+    case ControlShellRequestPB::kDestroyCluster:
+    {
+      RETURN_NOT_OK(CheckClusterExists(*cluster));
+      cluster->reset();
+      break;
+    }
+    case ControlShellRequestPB::kStartCluster:
+    {
+      RETURN_NOT_OK(CheckClusterExists(*cluster));
+      if ((*cluster)->num_masters() != 0) {
+        DCHECK_GT((*cluster)->num_tablet_servers(), 0);
+        RETURN_NOT_OK((*cluster)->Restart());
+      } else {
+        RETURN_NOT_OK((*cluster)->Start());
+      }
+      break;
+    }
+    case ControlShellRequestPB::kStopCluster:
+    {
+      RETURN_NOT_OK(CheckClusterExists(*cluster));
+      (*cluster)->Shutdown();
+      break;
+    }
+    case ControlShellRequestPB::kStartDaemon:
+    {
+      if (!req.start_daemon().has_id()) {
+        RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
+      }
+      ExternalDaemon* daemon;
+      MiniKdc* kdc;
+      RETURN_NOT_OK(FindDaemon(*cluster, req.start_daemon().id(), &daemon, &kdc));
+      if (daemon) {
+        DCHECK(!kdc);
+        RETURN_NOT_OK(daemon->Restart());
+      } else {
+        DCHECK(kdc);
+        RETURN_NOT_OK(kdc->Start());
+      }
+      break;
+    }
+    case ControlShellRequestPB::kStopDaemon:
+    {
+      if (!req.stop_daemon().has_id()) {
+        RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
+      }
+      ExternalDaemon* daemon;
+      MiniKdc* kdc;
+      RETURN_NOT_OK(FindDaemon(*cluster, req.stop_daemon().id(), &daemon, &kdc));
+      if (daemon) {
+        DCHECK(!kdc);
+        daemon->Shutdown();
+      } else {
+        DCHECK(kdc);
+        RETURN_NOT_OK(kdc->Stop());
+      }
+      break;
+    }
+    case ControlShellRequestPB::kGetMasters:
+    {
+      RETURN_NOT_OK(CheckClusterExists(*cluster));
+      for (int i = 0; i < (*cluster)->num_masters(); i++) {
+        HostPortPB pb;
+        RETURN_NOT_OK(HostPortToPB((*cluster)->master(i)->bound_rpc_hostport(), &pb));
+        DaemonInfoPB* info = resp->mutable_get_masters()->mutable_masters()->Add();
+        info->mutable_id()->set_type(MASTER);
+        info->mutable_id()->set_index(i);
+        *info->mutable_bound_rpc_address() = std::move(pb);
+      }
+      break;
+    }
+    case ControlShellRequestPB::kGetTservers:
+    {
+      RETURN_NOT_OK(CheckClusterExists(*cluster));
+      for (int i = 0; i < (*cluster)->num_tablet_servers(); i++) {
+        HostPortPB pb;
+        RETURN_NOT_OK(HostPortToPB((*cluster)->tablet_server(i)->bound_rpc_hostport(), &pb));
+        DaemonInfoPB* info = resp->mutable_get_tservers()->mutable_tservers()->Add();
+        info->mutable_id()->set_type(TSERVER);
+        info->mutable_id()->set_index(i);
+        *info->mutable_bound_rpc_address() = std::move(pb);
+      }
+      break;
+    }
+    case ControlShellRequestPB::kGetKdcEnvVars:
+    {
+      if (!(*cluster)->kdc()) {
+        RETURN_NOT_OK(Status::NotFound("kdc not found"));
+      }
+      auto env_vars = (*cluster)->kdc()->GetEnvVars();
+      resp->mutable_get_kdc_env_vars()->mutable_env_vars()->insert(
+          env_vars.begin(), env_vars.end());
+      break;
+    }
+    default:
+      RETURN_NOT_OK(Status::InvalidArgument("unknown cluster control request"));
+  }
+
+  return Status::OK();
+}
+
+Status RunControlShell(const RunnerContext& /*context*/) {
+  // Set up the protocol.
+  //
+  // Because we use stdin and stdout to communicate with the shell's parent,
+  // it's critical that none of our subprocesses write to stdout. To that end,
+  // the protocol will use stdout via another fd, and we'll redirect fd 1 to stderr.
+  int new_stdout = dup(STDOUT_FILENO);
+  PCHECK(new_stdout != -1);
+  PCHECK(dup2(STDERR_FILENO, STDOUT_FILENO) == STDOUT_FILENO);
+  ControlShellProtocol::SerializationMode serde_mode;
+  if (boost::iequals(FLAGS_serialization, "json")) {
+    serde_mode = ControlShellProtocol::SerializationMode::JSON;
+  } else {
+    DCHECK(boost::iequals(FLAGS_serialization, "pb"));
+    serde_mode = ControlShellProtocol::SerializationMode::PB;
+  }
+  ControlShellProtocol protocol(serde_mode,
+                                ControlShellProtocol::CloseMode::NO_CLOSE_ON_DESTROY,
+                                STDIN_FILENO,
+                                new_stdout);
+
+  // Run the shell loop, processing each message as it is received.
+  unique_ptr<ExternalMiniCluster> cluster;
+  while (true) {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+
+    // Receive a new request, blocking until one is received.
+    //
+    // IO errors are fatal while others will result in an error response.
+    Status s = protocol.ReceiveMessage(&req);
+    if (s.IsEndOfFile()) {
+      break;
+    }
+    if (s.IsIOError()) {
+      return s;
+    }
+
+    // If we've made it here, we're definitely going to respond.
+
+    if (s.ok()) {
+      // We've successfully received a message. Try to process it.
+      s = ProcessRequest(req, &resp, &cluster);
+    }
+
+    if (!s.ok()) {
+      // This may be the result of ReceiveMessage() or ProcessRequest(),
+      // whichever failed first.
+      StatusToPB(s, resp.mutable_error());
+    }
+
+    // Send the response. All errors are fatal.
+    s = protocol.SendMessage(resp);
+    if (s.IsEndOfFile()) {
+      break;
+    }
+    RETURN_NOT_OK(s);
+  }
+
+  // Normal exit, clean up data root.
+  if (cluster) {
+    cluster->Shutdown();
+    WARN_NOT_OK(Env::Default()->DeleteRecursively(cluster->data_root()),
+                "Could not delete data root");
+  }
+  return Status::OK();
+}
+
+string SerializeRequest(const ControlShellRequestPB& req) {
+  string serialized;
+  auto google_status = google::protobuf::util::MessageToJsonString(
+      req, &serialized);
+  CHECK(google_status.ok()) << Substitute(
+      "unable to serialize JSON ($0): $1",
+      google_status.error_message().ToString(), pb_util::SecureDebugString(req));
+  return serialized;
+}
+
+} // anonymous namespace
+
+unique_ptr<Mode> BuildTestMode() {
+
+  ControlShellRequestPB create;
+  create.mutable_create_cluster()->set_num_tservers(3);
+  ControlShellRequestPB start;
+  start.mutable_start_cluster();
+
+  string extra = Substitute(
+      "The protocol for the control shell is protobuf-based and is documented "
+      "in src/kudu/tools/tool.proto. It is currently considered to be highly "
+      "experimental and subject to change.\n"
+      "\n"
+      "Example JSON input to create and start a cluster:\n"
+      "    $0\n"
+      "    $1\n",
+      SerializeRequest(create),
+      SerializeRequest(start));
+
+  unique_ptr<Action> control_shell =
+      ActionBuilder("mini_cluster", &RunControlShell)
+      .Description("Spawn a control shell for running a mini-cluster")
+      .ExtraDescription(extra)
+      .AddOptionalParameter("serialization")
+      .Build();
+
+  return ModeBuilder("test")
+      .Description("Various test actions")
+      .AddAction(std::move(control_shell))
+      .Build();
+}
+
+} // namespace tools
+} // namespace kudu
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_main.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_main.cc b/src/kudu/tools/tool_main.cc
index 31f722e..4622074 100644
--- a/src/kudu/tools/tool_main.cc
+++ b/src/kudu/tools/tool_main.cc
@@ -70,6 +70,7 @@ unique_ptr<Mode> RootMode(const string& name) {
       .AddMode(BuildRemoteReplicaMode())
       .AddMode(BuildTableMode())
       .AddMode(BuildTabletMode())
+      .AddMode(BuildTestMode())
       .AddMode(BuildTServerMode())
       .AddMode(BuildWalMode())
       .Build();


Mime
View raw message