From commits-return-5937-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Thu May 3 23:43:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8F640180677 for ; Thu, 3 May 2018 23:43:32 +0200 (CEST) Received: (qmail 17534 invoked by uid 500); 3 May 2018 21:43:31 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 17525 invoked by uid 99); 3 May 2018 21:43:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 May 2018 21:43:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52000F693D; Thu, 3 May 2018 21:43:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: danburkert@apache.org To: commits@kudu.apache.org Date: Thu, 03 May 2018 21:43:32 -0000 Message-Id: In-Reply-To: <08b908b53e784eae9930ae01edba6a6d@git.apache.org> References: <08b908b53e784eae9930ae01edba6a6d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] kudu git commit: mini-cluster: support parallel multi-master clusters mini-cluster: support parallel multi-master clusters This commit refactors the mini-clusters to internally use reserved sockets for their child daemons. Reserved sockets are simply sockets bound to a random port with SO_REUSEPORT. As a result, master addresses for multi-master mini clusters no longer need to hard-coded master ports, and the associated ctest resource lock. This also significantly lessens the chances that port conflicts will occur, although it is still possible when masters are restarted during tests. To mitigate that on Linux, tests are still run in a unique subnet. Change-Id: I0b0ff7bfc179d8fdb1ed306d1bbd12acddeb060c Reviewed-on: http://gerrit.cloudera.org:8080/8280 Reviewed-by: Adar Dembo Tested-by: Dan Burkert Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/eb4d88f0 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/eb4d88f0 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/eb4d88f0 Branch: refs/heads/master Commit: eb4d88f0a376b13294c8985d76b3114eaa2e84ca Parents: ff4b67e Author: Dan Burkert Authored: Sun Oct 15 14:40:46 2017 -0700 Committer: Dan Burkert Committed: Thu May 3 21:43:01 2018 +0000 ---------------------------------------------------------------------- src/kudu/client/client-test.cc | 4 +- src/kudu/integration-tests/CMakeLists.txt | 25 ++-- .../catalog_manager_tsk-itest.cc | 1 - .../client-negotiation-failover-itest.cc | 2 - .../integration-tests/client-stress-test.cc | 1 - .../integration-tests/master-stress-test.cc | 3 +- .../master_cert_authority-itest.cc | 16 +-- .../integration-tests/master_failover-itest.cc | 4 +- .../integration-tests/master_migration-itest.cc | 107 ++++++++------- .../master_replication-itest.cc | 16 +-- .../integration-tests/security-faults-itest.cc | 1 - src/kudu/integration-tests/security-itest.cc | 1 - .../security-master-auth-itest.cc | 4 +- .../integration-tests/token_signer-itest.cc | 23 ++-- .../integration-tests/webserver-stress-itest.cc | 9 +- src/kudu/master/mini_master.cc | 1 + src/kudu/mini-cluster/CMakeLists.txt | 2 +- .../mini-cluster/external_mini_cluster-test.cc | 6 +- src/kudu/mini-cluster/external_mini_cluster.cc | 119 ++++++----------- src/kudu/mini-cluster/external_mini_cluster.h | 20 +-- src/kudu/mini-cluster/internal_mini_cluster.cc | 130 +++++++++---------- src/kudu/mini-cluster/internal_mini_cluster.h | 26 +--- src/kudu/mini-cluster/mini_cluster.cc | 21 +++ src/kudu/mini-cluster/mini_cluster.h | 19 ++- src/kudu/tools/CMakeLists.txt | 2 +- src/kudu/tools/ksck_remote-test.cc | 7 +- src/kudu/tools/tool_action_test.cc | 3 - 27 files changed, 248 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/client/client-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 816fb39..28f12e2 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -774,7 +774,7 @@ TEST_F(ClientTest, TestBadTable) { // Test that, if the master is down, we experience a network error talking // to it (no "find the new leader master" since there's only one master). TEST_F(ClientTest, TestMasterDown) { - cluster_->mini_master()->Shutdown(); + cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); shared_ptr t; client_->data_->default_admin_operation_timeout_ = MonoDelta::FromSeconds(1); Status s = client_->OpenTable("other-tablet", &t); @@ -2643,7 +2643,7 @@ void ClientTest::DoTestWriteWithDeadServer(WhichServerToKill which) { // Shut down the server. switch (which) { case DEAD_MASTER: - cluster_->mini_master()->Shutdown(); + cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY); break; case DEAD_TSERVER: cluster_->mini_tablet_server(0)->Shutdown(); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt index f5497e2..60ff72a 100644 --- a/src/kudu/integration-tests/CMakeLists.txt +++ b/src/kudu/integration-tests/CMakeLists.txt @@ -53,7 +53,6 @@ add_dependencies(itest_util # Tests set(KUDU_TEST_LINK_LIBS itest_util ${KUDU_MIN_TEST_LIBS}) ADD_KUDU_TEST(all_types-itest - RESOURCE_LOCK "master-rpc-ports" PROCESSORS 4 NUM_SHARDS 8) ADD_KUDU_TEST(alter_table-randomized-test @@ -62,10 +61,8 @@ ADD_KUDU_TEST(alter_table-test PROCESSORS 3) ADD_KUDU_TEST(authn_token_expire-itest) ADD_KUDU_TEST(catalog_manager_tsk-itest PROCESSORS 2) ADD_KUDU_TEST(client_failover-itest) -ADD_KUDU_TEST(client-negotiation-failover-itest - RESOURCE_LOCK "master-rpc-ports") +ADD_KUDU_TEST(client-negotiation-failover-itest) ADD_KUDU_TEST(client-stress-test - RESOURCE_LOCK "master-rpc-ports" RUN_SERIAL true) ADD_KUDU_TEST(consensus_peer_health_status-itest) ADD_KUDU_TEST(consistency-itest PROCESSORS 5) @@ -81,16 +78,16 @@ ADD_KUDU_TEST(flex_partitioning-itest TIMEOUT 1800 NUM_SHARDS 8 PROCESSORS 2) ADD_KUDU_TEST(full_stack-insert-scan-test RUN_SERIAL true) ADD_KUDU_TEST(fuzz-itest RUN_SERIAL true) ADD_KUDU_TEST(heavy-update-compaction-itest RUN_SERIAL true) -ADD_KUDU_TEST(linked_list-test RESOURCE_LOCK "master-rpc-ports" RUN_SERIAL true) +ADD_KUDU_TEST(linked_list-test RUN_SERIAL true) ADD_KUDU_TEST(log-rolling-itest) -ADD_KUDU_TEST(master_cert_authority-itest RESOURCE_LOCK "master-rpc-ports" PROCESSORS 2) -ADD_KUDU_TEST(master_failover-itest RESOURCE_LOCK "master-rpc-ports" PROCESSORS 3) +ADD_KUDU_TEST(master_cert_authority-itest PROCESSORS 2) +ADD_KUDU_TEST(master_failover-itest PROCESSORS 3) ADD_KUDU_TEST(master_hms-itest RUN_SERIAL true PROCESSORS 4) -ADD_KUDU_TEST(master_migration-itest RESOURCE_LOCK "master-rpc-ports") +ADD_KUDU_TEST(master_migration-itest) ADD_KUDU_TEST_DEPENDENCIES(master_migration-itest kudu) -ADD_KUDU_TEST(master_replication-itest RESOURCE_LOCK "master-rpc-ports") -ADD_KUDU_TEST(master-stress-test RESOURCE_LOCK "master-rpc-ports" RUN_SERIAL true) +ADD_KUDU_TEST(master_replication-itest) +ADD_KUDU_TEST(master-stress-test RUN_SERIAL true) ADD_KUDU_TEST(multidir_cluster-itest) ADD_KUDU_TEST(open-readonly-fs-itest PROCESSORS 4) ADD_KUDU_TEST(raft_config_change-itest) @@ -102,8 +99,8 @@ ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true NUM_SHARDS 6) ADD_KUDU_TEST(replace_tablet-itest) ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port") ADD_KUDU_TEST(security-faults-itest) -ADD_KUDU_TEST(security-itest RESOURCE_LOCK "master-rpc-ports") -ADD_KUDU_TEST(security-master-auth-itest RESOURCE_LOCK "master-rpc-ports") +ADD_KUDU_TEST(security-itest) +ADD_KUDU_TEST(security-master-auth-itest) ADD_KUDU_TEST(security-unknown-tsk-itest PROCESSORS 4) ADD_KUDU_TEST(stop_tablet-itest PROCESSORS 4) ADD_KUDU_TEST(table_locations-itest) @@ -114,11 +111,11 @@ ADD_KUDU_TEST(tablet_replacement-itest) ADD_KUDU_TEST(tombstoned_voting-imc-itest) ADD_KUDU_TEST(tombstoned_voting-itest) ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true) -ADD_KUDU_TEST(token_signer-itest RESOURCE_LOCK "master-rpc-ports") +ADD_KUDU_TEST(token_signer-itest) ADD_KUDU_TEST(ts_recovery-itest PROCESSORS 4) ADD_KUDU_TEST(ts_tablet_manager-itest) ADD_KUDU_TEST(update_scan_delta_compact-test RUN_SERIAL true) -ADD_KUDU_TEST(webserver-stress-itest RESOURCE_LOCK "master-rpc-ports" RUN_SERIAL true) +ADD_KUDU_TEST(webserver-stress-itest RUN_SERIAL true) ADD_KUDU_TEST(write_throttling-itest) if (NOT APPLE) http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/catalog_manager_tsk-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/catalog_manager_tsk-itest.cc b/src/kudu/integration-tests/catalog_manager_tsk-itest.cc index 9812102..12169af 100644 --- a/src/kudu/integration-tests/catalog_manager_tsk-itest.cc +++ b/src/kudu/integration-tests/catalog_manager_tsk-itest.cc @@ -84,7 +84,6 @@ class CatalogManagerTskITest : public KuduTest { #endif { cluster_opts_.num_masters = num_masters_; - cluster_opts_.master_rpc_ports = { 11030, 11031, 11032 }; cluster_opts_.num_tablet_servers = num_tservers_; // Add master-only flags. http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/client-negotiation-failover-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/client-negotiation-failover-itest.cc b/src/kudu/integration-tests/client-negotiation-failover-itest.cc index bc21e8a..4fee884 100644 --- a/src/kudu/integration-tests/client-negotiation-failover-itest.cc +++ b/src/kudu/integration-tests/client-negotiation-failover-itest.cc @@ -197,7 +197,6 @@ TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu2021ConnectToMaster) { cluster_opts_.num_masters = kNumMasters; cluster_opts_.num_tablet_servers = 1; - cluster_opts_.master_rpc_ports = { 32037, 32038, 32039 }; ASSERT_OK(CreateAndStartCluster()); shared_ptr client; @@ -232,7 +231,6 @@ TEST_F(ClientFailoverOnNegotiationTimeoutITest, Kudu2021NegotiateWithMaster) { cluster_opts_.num_masters = kNumMasters; cluster_opts_.num_tablet_servers = 1; - cluster_opts_.master_rpc_ports = { 31037, 31038, 31039 }; ASSERT_OK(CreateAndStartCluster()); shared_ptr client; http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/client-stress-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/client-stress-test.cc b/src/kudu/integration-tests/client-stress-test.cc index 5146e13..49d9f32 100644 --- a/src/kudu/integration-tests/client-stress-test.cc +++ b/src/kudu/integration-tests/client-stress-test.cc @@ -76,7 +76,6 @@ class ClientStressTest : public KuduTest { ExternalMiniClusterOptions opts = default_opts(); if (multi_master()) { opts.num_masters = 3; - opts.master_rpc_ports = { 11010, 11011, 11012 }; } opts.num_tablet_servers = 3; cluster_.reset(new ExternalMiniCluster(std::move(opts))); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/master-stress-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/master-stress-test.cc b/src/kudu/integration-tests/master-stress-test.cc index 0160c33..30bed01 100644 --- a/src/kudu/integration-tests/master-stress-test.cc +++ b/src/kudu/integration-tests/master-stress-test.cc @@ -117,8 +117,7 @@ class MasterStressTest : public KuduTest { KuduTest::SetUp(); ExternalMiniClusterOptions opts; - opts.master_rpc_ports = { 11010, 11011, 11012 }; - opts.num_masters = opts.master_rpc_ports.size(); + opts.num_masters = 3; opts.num_tablet_servers = 3; // TODO(dan): enable HMS integration. Currently the test fails when it's http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/master_cert_authority-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/master_cert_authority-itest.cc b/src/kudu/integration-tests/master_cert_authority-itest.cc index 4687477..491c9c4 100644 --- a/src/kudu/integration-tests/master_cert_authority-itest.cc +++ b/src/kudu/integration-tests/master_cert_authority-itest.cc @@ -16,7 +16,6 @@ // under the License. #include -#include #include #include #include @@ -85,11 +84,7 @@ namespace master { class MasterCertAuthorityTest : public KuduTest { public: MasterCertAuthorityTest() { - // Hard-coded ports for the masters. This is safe, as this unit test - // runs under a resource lock (see CMakeLists.txt in this directory). - // TODO(aserbin): we should have a generic method to obtain n free ports. - opts_.master_rpc_ports = { 11010, 11011, 11012 }; - opts_.num_masters = opts_.master_rpc_ports.size(); + opts_.num_masters = 3; } void SetUp() override { @@ -363,11 +358,10 @@ class ConnectToClusterBaseTest : public KuduTest { public: ConnectToClusterBaseTest(int run_time_seconds, int latency_ms, - vector master_ports) + int num_masters) : run_time_seconds_(run_time_seconds), latency_ms_(latency_ms) { - cluster_opts_.master_rpc_ports = std::move(master_ports); - cluster_opts_.num_masters = cluster_opts_.master_rpc_ports.size(); + cluster_opts_.num_masters = num_masters; } void SetUp() override { @@ -427,7 +421,7 @@ class ConnectToClusterBaseTest : public KuduTest { class SingleMasterConnectToClusterTest : public ConnectToClusterBaseTest { public: SingleMasterConnectToClusterTest() - : ConnectToClusterBaseTest(5, 2500, { 11020 }) { + : ConnectToClusterBaseTest(5, 2500, 1) { // Add master-only flags. cluster_opts_.extra_master_flags.push_back(Substitute( "--catalog_manager_inject_latency_load_ca_info_ms=$0", latency_ms_)); @@ -446,7 +440,7 @@ class SingleMasterConnectToClusterTest : public ConnectToClusterBaseTest { class MultiMasterConnectToClusterTest : public ConnectToClusterBaseTest { public: MultiMasterConnectToClusterTest() - : ConnectToClusterBaseTest(120, 2000, { 11030, 11031, 11032 }) { + : ConnectToClusterBaseTest(120, 2000, 3) { constexpr int kHbIntervalMs = 100; // Add master-only flags. const vector master_flags = { http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/master_failover-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/master_failover-itest.cc b/src/kudu/integration-tests/master_failover-itest.cc index 3cf4aa2..06498a4 100644 --- a/src/kudu/integration-tests/master_failover-itest.cc +++ b/src/kudu/integration-tests/master_failover-itest.cc @@ -78,8 +78,7 @@ class MasterFailoverTest : public KuduTest { }; MasterFailoverTest() { - opts_.master_rpc_ports = { 11010, 11011, 11012 }; - opts_.num_masters = num_masters_ = opts_.master_rpc_ports.size(); + opts_.num_masters = 3; opts_.num_tablet_servers = kNumTabletServerReplicas; opts_.enable_hive_metastore = true; @@ -151,7 +150,6 @@ class MasterFailoverTest : public KuduTest { } protected: - int num_masters_; ExternalMiniClusterOptions opts_; unique_ptr cluster_; shared_ptr client_; http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/master_migration-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/master_migration-itest.cc b/src/kudu/integration-tests/master_migration-itest.cc index f0a43f4..72eb7e2 100644 --- a/src/kudu/integration-tests/master_migration-itest.cc +++ b/src/kudu/integration-tests/master_migration-itest.cc @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include #include #include #include @@ -27,7 +26,6 @@ #include "kudu/client/client.h" #include "kudu/client/schema.h" #include "kudu/client/shared_ptr.h" -#include "kudu/gutil/port.h" #include "kudu/gutil/strings/strip.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/master/sys_catalog.h" @@ -35,6 +33,8 @@ #include "kudu/mini-cluster/mini_cluster.h" #include "kudu/util/env.h" #include "kudu/util/net/net_util.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" #include "kudu/util/path_util.h" #include "kudu/util/status.h" #include "kudu/util/subprocess.h" @@ -52,41 +52,18 @@ using kudu::client::KuduTableCreator; using kudu::client::sp::shared_ptr; using kudu::cluster::ExternalMiniCluster; using kudu::cluster::ExternalMiniClusterOptions; +using kudu::cluster::MiniCluster; using kudu::cluster::ScopedResumeExternalDaemon; using kudu::master::SysCatalogTable; using std::pair; using std::string; -using std::vector; using std::unique_ptr; +using std::vector; using strings::Substitute; namespace kudu { class MasterMigrationTest : public KuduTest { - public: - - virtual void SetUp() OVERRIDE { - KuduTest::SetUp(); - ASSERT_NO_FATAL_FAILURE(RestartCluster()); - } - - virtual void TearDown() OVERRIDE { - if (cluster_) { - cluster_->Shutdown(); - } - KuduTest::TearDown(); - } - - void RestartCluster() { - if (cluster_) { - cluster_->Shutdown(); - } - cluster_.reset(new ExternalMiniCluster()); - ASSERT_OK(cluster_->Start()); - } - - protected: - unique_ptr cluster_; }; static Status CreateTable(ExternalMiniCluster* cluster, @@ -107,23 +84,46 @@ static Status CreateTable(ExternalMiniCluster* cluster, // Tests migration of a deployment from one master to multiple masters. TEST_F(MasterMigrationTest, TestEndToEndMigration) { - const vector kMasterRpcPorts = { 11010, 11011, 11012 }; + const int kNumMasters = 3; + + // Collect and keep alive the set of master sockets bound with SO_REUSEPORT. + // This allows the ports to be reserved up front, so that they won't be taken + // while the test restarts nodes. + vector> reserved_sockets; + vector master_rpc_addresses; + for (int i = 0; i < kNumMasters; i++) { + unique_ptr reserved_socket; + ASSERT_OK(MiniCluster::ReserveDaemonSocket(MiniCluster::MASTER, i, + MiniCluster::kDefaultBindMode, + &reserved_socket)); + Sockaddr addr; + ASSERT_OK(reserved_socket->GetSocketAddress(&addr)); + master_rpc_addresses.emplace_back(addr.host(), addr.port()); + reserved_sockets.emplace_back(std::move(reserved_socket)); + } + + ExternalMiniClusterOptions opts; + opts.num_masters = 1; + opts.master_rpc_addresses = { master_rpc_addresses[0] }; + opts.bind_mode = cluster::MiniCluster::LOOPBACK; + + unique_ptr cluster(new ExternalMiniCluster(opts)); + ASSERT_OK(cluster->Start()); + const string kTableName = "test"; - const string kBinPath = cluster_->GetBinaryPath("kudu"); + const string kBinPath = cluster->GetBinaryPath("kudu"); // Initial state: single-master cluster with one table. - ASSERT_OK(CreateTable(cluster_.get(), kTableName)); - cluster_->Shutdown(); + ASSERT_OK(CreateTable(cluster.get(), kTableName)); + cluster->Shutdown(); - // List of every master's UUID and port. Used when rewriting the single - // master's cmeta. - vector> master_uuids_and_ports; - master_uuids_and_ports.emplace_back(cluster_->master()->uuid(), kMasterRpcPorts[0]); + // List of every master UUIDs. + vector uuids = { cluster->master()->uuid() }; // Format a filesystem tree for each of the new masters and get the uuids. - for (int i = 1; i < kMasterRpcPorts.size(); i++) { - string data_root = cluster_->GetDataPath(Substitute("master-$0", i)); - string wal_dir = cluster_->GetWalPath(Substitute("master-$0", i)); + for (int i = 1; i < kNumMasters; i++) { + string data_root = cluster->GetDataPath(Substitute("master-$0", i)); + string wal_dir = cluster->GetWalPath(Substitute("master-$0", i)); ASSERT_OK(env_->CreateDir(DirName(data_root))); ASSERT_OK(env_->CreateDir(wal_dir)); { @@ -148,24 +148,24 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) { string uuid; ASSERT_OK(Subprocess::Call(args, "", &uuid)); StripWhiteSpace(&uuid); - master_uuids_and_ports.emplace_back(uuid, kMasterRpcPorts[i]); + uuids.emplace_back(uuid); } } // Rewrite the single master's cmeta to reflect the new Raft configuration. { - string data_root = cluster_->GetDataPath("master-0"); + string data_root = cluster->GetDataPath("master-0"); vector args = { kBinPath, "local_replica", "cmeta", "rewrite_raft_config", - "--fs_wal_dir=" + cluster_->GetWalPath("master-0"), + "--fs_wal_dir=" + cluster->GetWalPath("master-0"), "--fs_data_dirs=" + data_root, SysCatalogTable::kSysCatalogTabletId }; - for (const auto& m : master_uuids_and_ports) { - args.push_back(Substitute("$0:127.0.0.1:$1", m.first, m.second)); + for (int i = 0; i < kNumMasters; i++) { + args.emplace_back(Substitute("$0:$1", uuids[i], master_rpc_addresses[i].ToString())); } ASSERT_OK(Subprocess::Call(args)); } @@ -177,13 +177,13 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) { // made it aware that it should replicate to the new masters, but they're not // actually running. Thus, it cannot become leader or do any real work. But, // it can still service remote bootstrap requests. - NO_FATALS(RestartCluster()); + ASSERT_OK(cluster->Restart()); // Use remote bootstrap to copy the master tablet to each of the new masters' // filesystems. - for (int i = 1; i < kMasterRpcPorts.size(); i++) { - string data_root = cluster_->GetDataPath(Substitute("master-$0", i)); - string wal_dir = cluster_->GetWalPath(Substitute("master-$0", i)); + for (int i = 1; i < kNumMasters; i++) { + string data_root = cluster->GetDataPath(Substitute("master-$0", i)); + string wal_dir = cluster->GetWalPath(Substitute("master-$0", i)); vector args = { kBinPath, "local_replica", @@ -191,18 +191,15 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) { "--fs_wal_dir=" + wal_dir, "--fs_data_dirs=" + data_root, SysCatalogTable::kSysCatalogTabletId, - cluster_->master()->bound_rpc_hostport().ToString() + cluster->master()->bound_rpc_hostport().ToString() }; ASSERT_OK(Subprocess::Call(args)); } // Bring down the old cluster configuration and bring up the new one. - cluster_->Shutdown(); - ExternalMiniClusterOptions opts; - // Required since we use 127.0.0.1 in the config above. - opts.bind_mode = cluster::MiniCluster::LOOPBACK; - opts.master_rpc_ports = kMasterRpcPorts; - opts.num_masters = kMasterRpcPorts.size(); + cluster->Shutdown(); + opts.num_masters = 3; + opts.master_rpc_addresses = master_rpc_addresses; ExternalMiniCluster migrated_cluster(std::move(opts)); ASSERT_OK(migrated_cluster.Start()); @@ -221,7 +218,7 @@ TEST_F(MasterMigrationTest, TestEndToEndMigration) { // // Only in slow mode. if (AllowSlowTests()) { - for (int i = 0; i < migrated_cluster.num_masters(); i++) { + for (int i = 0; i < kNumMasters; i++) { ASSERT_OK(migrated_cluster.master(i)->Pause()); ScopedResumeExternalDaemon resume_daemon(migrated_cluster.master(i)); ASSERT_OK(client->OpenTable(kTableName, &table)); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/master_replication-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/master_replication-itest.cc b/src/kudu/integration-tests/master_replication-itest.cc index a521ee0..5013508 100644 --- a/src/kudu/integration-tests/master_replication-itest.cc +++ b/src/kudu/integration-tests/master_replication-itest.cc @@ -81,12 +81,7 @@ const int kNumTabletServerReplicas = 3; class MasterReplicationTest : public KuduTest { public: MasterReplicationTest() { - // Hard-coded ports for the masters. This is safe, as this unit test - // runs under a resource lock (see CMakeLists.txt in this directory). - // TODO we should have a generic method to obtain n free ports. - opts_.master_rpc_ports = { 11010, 11011, 11012 }; - - opts_.num_masters = num_masters_ = opts_.master_rpc_ports.size(); + opts_.num_masters = 3; opts_.num_tablet_servers = kNumTabletServerReplicas; } @@ -111,14 +106,14 @@ class MasterReplicationTest : public KuduTest { } void ListMasterServerAddrs(vector* out) { - for (int i = 0; i < num_masters_; i++) { - out->push_back(cluster_->mini_master(i)->bound_rpc_addr_str()); + for (const auto& hostport : cluster_->master_rpc_addrs()) { + out->emplace_back(hostport.ToString()); } } Status CreateClient(shared_ptr* out) { KuduClientBuilder builder; - for (int i = 0; i < num_masters_; i++) { + for (int i = 0; i < cluster_->num_masters(); i++) { if (!cluster_->mini_master(i)->master()->IsShutdown()) { builder.add_master_server_addr(cluster_->mini_master(i)->bound_rpc_addr_str()); } @@ -143,7 +138,6 @@ class MasterReplicationTest : public KuduTest { } protected: - int num_masters_; InternalMiniClusterOptions opts_; gscoped_ptr cluster_; }; @@ -291,8 +285,8 @@ TEST_F(MasterReplicationTest, TestHeartbeatAcceptedByAnyMaster) { TEST_F(MasterReplicationTest, TestMasterPeerSetsDontMatch) { // Restart one master with an additional entry in --master_addresses. The // discrepancy with the on-disk list of masters should trigger a failure. - cluster_->mini_master(0)->Shutdown(); vector master_rpc_addrs = cluster_->master_rpc_addrs(); + cluster_->mini_master(0)->Shutdown(); master_rpc_addrs.emplace_back("127.0.0.1", 55555); cluster_->mini_master(0)->SetMasterAddresses(master_rpc_addrs); ASSERT_OK(cluster_->mini_master(0)->Start()); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/security-faults-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/security-faults-itest.cc b/src/kudu/integration-tests/security-faults-itest.cc index 3873039..dec000e 100644 --- a/src/kudu/integration-tests/security-faults-itest.cc +++ b/src/kudu/integration-tests/security-faults-itest.cc @@ -80,7 +80,6 @@ class SecurityComponentsFaultsITest : public KuduTest { cluster_opts_.enable_kerberos = true; cluster_opts_.num_masters = num_masters_; - cluster_opts_.master_rpc_ports = { 11080, 11081, 11082 }; cluster_opts_.num_tablet_servers = num_tservers_; // KDC-related options http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/security-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/security-itest.cc b/src/kudu/integration-tests/security-itest.cc index 0d12177..e3128b4 100644 --- a/src/kudu/integration-tests/security-itest.cc +++ b/src/kudu/integration-tests/security-itest.cc @@ -417,7 +417,6 @@ TEST_P(ConnectToFollowerMasterTest, AuthnTokenVerifierHaveKeys) { cluster_opts_.extra_master_flags.emplace_back( "--enable_leader_failure_detection=false"); cluster_opts_.num_masters = 3; - cluster_opts_.master_rpc_ports = { 11010, 11011, 11012, }; const auto& params = GetParam(); cluster_opts_.extra_master_flags.emplace_back( Substitute("--rpc_authentication=$0", params.rpc_authentication)); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/security-master-auth-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/security-master-auth-itest.cc b/src/kudu/integration-tests/security-master-auth-itest.cc index 9bdede1..ffe0d02 100644 --- a/src/kudu/integration-tests/security-master-auth-itest.cc +++ b/src/kudu/integration-tests/security-master-auth-itest.cc @@ -16,7 +16,6 @@ // under the License. #include -#include #include #include @@ -53,8 +52,7 @@ class SecurityMasterAuthTest : public KuduTest { FLAGS_enable_leader_failure_detection = false; InternalMiniClusterOptions opts; - opts.master_rpc_ports = { 11010, 11011, 11012, 11013, 11014, }; - opts.num_masters = opts.master_rpc_ports.size(); + opts.num_masters = 5; opts.num_tablet_servers = 0; cluster_.reset(new InternalMiniCluster(env_, opts)); ASSERT_OK(cluster_->Start()); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/token_signer-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/token_signer-itest.cc b/src/kudu/integration-tests/token_signer-itest.cc index 103b4c7..d5468de 100644 --- a/src/kudu/integration-tests/token_signer-itest.cc +++ b/src/kudu/integration-tests/token_signer-itest.cc @@ -65,13 +65,8 @@ class TokenSignerITest : public KuduTest { FLAGS_authn_token_validity_seconds = authn_token_validity_seconds_; FLAGS_tsk_rotation_seconds = tsk_rotation_seconds_; - // Hard-coded ports for the masters. This is safe, as this unit test - // runs under a resource lock (see CMakeLists.txt in this directory). - // TODO(aserbin): we should have a generic method to obtain n free ports. - opts_.master_rpc_ports = { 11010, 11011, 11012 }; - - opts_.num_masters = num_masters_ = opts_.master_rpc_ports.size(); - opts_.num_tablet_servers = num_tablet_servers_; + opts_.num_masters = 3; + opts_.num_tablet_servers = 3; } void SetUp() override { @@ -110,7 +105,7 @@ class TokenSignerITest : public KuduTest { Status GetPublicKeys(int idx, vector* public_keys) { CHECK_GE(idx, 0); - CHECK_LT(idx, num_masters_); + CHECK_LT(idx, cluster_->num_masters()); MiniMaster* mm = cluster_->mini_master(idx); vector keys = mm->master()->token_verifier().ExportKeys(); @@ -128,8 +123,6 @@ class TokenSignerITest : public KuduTest { const int64_t authn_token_validity_seconds_ = 20; const int64_t tsk_rotation_seconds_ = 20; - int num_masters_; - const int num_tablet_servers_ = 3; InternalMiniClusterOptions opts_; unique_ptr cluster_; }; @@ -241,7 +234,7 @@ TEST_F(TokenSignerITest, AuthnTokenLifecycle) { ASSERT_OK(MakeSignedToken(&stoken)); ASSERT_EQ(key_seq_num, stoken.signing_key_seq_num()); - for (int i = 0; i < num_tablet_servers_; ++i) { + for (int i = 0; i < opts_.num_tablet_servers; ++i) { const tserver::TabletServer* ts = cluster_->mini_tablet_server(i)->server(); ASSERT_NE(nullptr, ts); TokenPB token; @@ -278,10 +271,10 @@ TEST_F(TokenSignerITest, AuthnTokenLifecycle) { const int64_t key_seq_num_token_eotai = stoken_eotai.signing_key_seq_num(); ASSERT_EQ(key_seq_num, key_seq_num_token_eotai); - vector expired_at_tserver(num_tablet_servers_, false); - vector valid_at_tserver(num_tablet_servers_, false); + vector expired_at_tserver(opts_.num_tablet_servers, false); + vector valid_at_tserver(opts_.num_tablet_servers, false); while (true) { - for (int i = 0; i < num_tablet_servers_; ++i) { + for (int i = 0; i < opts_.num_tablet_servers; ++i) { if (expired_at_tserver[i]) { continue; } @@ -321,7 +314,7 @@ TEST_F(TokenSignerITest, AuthnTokenLifecycle) { } // Wait until current TSK expires and try to verify the token again. // The token verification result should be EXPIRED_TOKEN. - for (int i = 0; i < num_tablet_servers_; ++i) { + for (int i = 0; i < opts_.num_tablet_servers; ++i) { const tserver::TabletServer* ts = cluster_->mini_tablet_server(i)->server(); ASSERT_NE(nullptr, ts); TokenPB token; http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/integration-tests/webserver-stress-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/webserver-stress-itest.cc b/src/kudu/integration-tests/webserver-stress-itest.cc index 51e4083..a04e03d 100644 --- a/src/kudu/integration-tests/webserver-stress-itest.cc +++ b/src/kudu/integration-tests/webserver-stress-itest.cc @@ -47,13 +47,6 @@ TEST_F(KuduTest, TestWebUIDoesNotCrashCluster) { const int kNumTablets = 50; ExternalMiniClusterOptions opts; - // Force specific ports so that we can restart and guarantee we - // can bind the same port. If we use ephemeral ports, it's possible - // for one of the 'curl' threads to grab one of the ports as the local - // side of a client TCP connection while the server is down, preventing - // it from restarting. Choosing ports from the non-ephemeral range - // prevents this. - opts.master_rpc_ports = { 11010, 11011, 11012 }; #ifdef __linux__ // We can only do explicit webserver ports on Linux, where we use // IPs like 127.x.y.z to bind the minicluster servers to different @@ -63,7 +56,7 @@ TEST_F(KuduTest, TestWebUIDoesNotCrashCluster) { opts.extra_master_flags.emplace_back("-webserver_port=11013"); opts.extra_tserver_flags.emplace_back("-webserver_port=11014"); #endif - opts.num_masters = opts.master_rpc_ports.size(); + opts.num_masters = 3; ExternalMiniCluster cluster(opts); ASSERT_OK(cluster.Start()); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/master/mini_master.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/mini_master.cc b/src/kudu/master/mini_master.cc index 9f8c0df..40a0ad9 100644 --- a/src/kudu/master/mini_master.cc +++ b/src/kudu/master/mini_master.cc @@ -56,6 +56,7 @@ MiniMaster::MiniMaster(string fs_root, HostPort rpc_bind_addr, int num_data_dirs FLAGS_enable_minidumps = false; HostPort web_bind_addr(rpc_bind_addr_.host(), /*port=*/ 0); opts_.rpc_opts.rpc_bind_addresses = rpc_bind_addr_.ToString(); + opts_.rpc_opts.rpc_reuseport = true; opts_.webserver_opts.bind_interface = web_bind_addr.host(); opts_.webserver_opts.port = web_bind_addr.port(); if (num_data_dirs == 1) { http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/mini-cluster/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/CMakeLists.txt b/src/kudu/mini-cluster/CMakeLists.txt index 9d2e394..356e492 100644 --- a/src/kudu/mini-cluster/CMakeLists.txt +++ b/src/kudu/mini-cluster/CMakeLists.txt @@ -49,4 +49,4 @@ add_dependencies(mini_cluster # Tests set(KUDU_TEST_LINK_LIBS mini_cluster kudu_hms ${KUDU_MIN_TEST_LIBS}) -ADD_KUDU_TEST(external_mini_cluster-test RESOURCE_LOCK "master-rpc-ports" PROCESSORS 3) +ADD_KUDU_TEST(external_mini_cluster-test PROCESSORS 3) http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/mini-cluster/external_mini_cluster-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc b/src/kudu/mini-cluster/external_mini_cluster-test.cc index 2df19b7..cb25156 100644 --- a/src/kudu/mini-cluster/external_mini_cluster-test.cc +++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc @@ -126,11 +126,7 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) { opts.enable_kerberos = GetParam().first == Kerberos::ENABLED; opts.enable_hive_metastore = GetParam().second == HiveMetastore::ENABLED; - // Hard-coded RPC ports for the masters. This is safe, as this unit test - // runs under a resource lock (see CMakeLists.txt in this directory). - // TODO(af) we should have a generic method to obtain n free ports. - opts.master_rpc_ports = { 11010, 11011, 11012 }; - opts.num_masters = opts.master_rpc_ports.size(); + opts.num_masters = 3; opts.num_tablet_servers = 3; ExternalMiniCluster cluster(opts); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/mini-cluster/external_mini_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc index d3e8b9b..8b300e3 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.cc +++ b/src/kudu/mini-cluster/external_mini_cluster.cc @@ -59,6 +59,7 @@ #include "kudu/util/fault_injection.h" #include "kudu/util/monotime.h" #include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" #include "kudu/util/path_util.h" #include "kudu/util/pb_util.h" #include "kudu/util/status.h" @@ -201,17 +202,10 @@ Status ExternalMiniCluster::Start() { "Failed to start the Hive Metastore"); } - if (opts_.num_masters != 1) { - RETURN_NOT_OK_PREPEND(StartDistributedMasters(), - "Failed to add distributed masters"); - } else { - RETURN_NOT_OK_PREPEND(StartSingleMaster(), - Substitute("Failed to start a single Master")); - } + RETURN_NOT_OK_PREPEND(StartMasters(), "failed to start masters"); for (int i = 1; i <= opts_.num_tablet_servers; i++) { - RETURN_NOT_OK_PREPEND(AddTabletServer(), - Substitute("Failed starting tablet server $0", i)); + RETURN_NOT_OK_PREPEND(AddTabletServer(), Substitute("failed to start tablet server $0", i)); } RETURN_NOT_OK(WaitForTabletServerCount( opts_.num_tablet_servers, @@ -308,8 +302,7 @@ string ExternalMiniCluster::GetWalPath(const string& daemon_id) const { } namespace { -vector SubstituteInFlags(const vector& orig_flags, - int index) { +vector SubstituteInFlags(const vector& orig_flags, int index) { string str_index = strings::Substitute("$0", index); vector ret; for (const string& orig : orig_flags) { @@ -317,59 +310,43 @@ vector SubstituteInFlags(const vector& orig_flags, } return ret; } - } // anonymous namespace -Status ExternalMiniCluster::StartSingleMaster() { - string daemon_id = "master-0"; - - ExternalDaemonOptions opts; - opts.messenger = messenger_; - opts.exe = GetBinaryPath(kMasterBinaryName); - opts.wal_dir = GetWalPath(daemon_id); - opts.data_dirs = GetDataPaths(daemon_id); - opts.log_dir = GetLogPath(daemon_id); - opts.block_manager_type = opts_.block_manager_type; - if (FLAGS_perf_record) { - opts.perf_record_filename = - Substitute("$0/perf-$1.data", opts.log_dir, daemon_id); - } - opts.extra_flags = SubstituteInFlags(opts_.extra_master_flags, 0); - opts.start_process_timeout = opts_.start_process_timeout; - opts.logtostderr = opts_.logtostderr; - - opts.rpc_bind_address = HostPort(GetBindIpForMaster(0), 0); - if (opts_.enable_hive_metastore) { - opts.extra_flags.emplace_back(Substitute("--hive_metastore_uris=$0", hms_->uris())); - opts.extra_flags.emplace_back(Substitute("--hive_metastore_sasl_enabled=$0", - opts_.enable_kerberos)); - } - scoped_refptr master = new ExternalMaster(opts); - if (opts_.enable_kerberos) { - // The bind host here is the hostname that will be used to generate the - // Kerberos principal, so it has to match the bind address for the master - // rpc endpoint. - RETURN_NOT_OK_PREPEND(master->EnableKerberos(kdc_.get(), opts.rpc_bind_address.host()), - "could not enable Kerberos"); - } - - RETURN_NOT_OK(master->Start()); - masters_.push_back(master); - return Status::OK(); -} - -Status ExternalMiniCluster::StartDistributedMasters() { +Status ExternalMiniCluster::StartMasters() { int num_masters = opts_.num_masters; - if (opts_.master_rpc_ports.size() != num_masters) { - LOG(FATAL) << num_masters << " masters requested, but only " << - opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'"; + // Collect and keep alive the set of master sockets bound with SO_REUSEPORT + // until all master proccesses are started. This allows the mini-cluster to + // reserve a set of ports up front, then later start the set of masters, each + // configured with the full set of ports. + // + // TODO(dan): re-bind the ports between node restarts in order to prevent other + // processess from binding to them in the interim. + vector> reserved_sockets; + vector master_rpc_addrs; + + if (!opts_.master_rpc_addresses.empty()) { + CHECK_EQ(opts_.master_rpc_addresses.size(), num_masters); + master_rpc_addrs = opts_.master_rpc_addresses; + } else { + for (int i = 0; i < num_masters; i++) { + unique_ptr reserved_socket; + RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(MiniCluster::MASTER, i, opts_.bind_mode, + &reserved_socket), + "failed to reserve master socket address"); + Sockaddr addr; + RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr)); + master_rpc_addrs.emplace_back(addr.host(), addr.port()); + reserved_sockets.emplace_back(std::move(reserved_socket)); + } } - vector peer_hostports = master_rpc_addrs(); vector flags = opts_.extra_master_flags; - flags.push_back(Substitute("--master_addresses=$0", - HostPort::ToCommaSeparatedString(peer_hostports))); + flags.emplace_back("--rpc_reuseport=true"); + if (num_masters > 1) { + flags.emplace_back(Substitute("--master_addresses=$0", + HostPort::ToCommaSeparatedString(master_rpc_addrs))); + } string exe = GetBinaryPath(kMasterBinaryName); // Start the masters. @@ -389,7 +366,7 @@ Status ExternalMiniCluster::StartDistributedMasters() { } opts.extra_flags = SubstituteInFlags(flags, i); opts.start_process_timeout = opts_.start_process_timeout; - opts.rpc_bind_address = peer_hostports[i]; + opts.rpc_bind_address = master_rpc_addrs[i]; if (opts_.enable_hive_metastore) { opts.extra_flags.emplace_back(Substitute("--hive_metastore_uris=$0", hms_->uris())); opts.extra_flags.emplace_back(Substitute("--hive_metastore_sasl_enabled=$0", @@ -399,12 +376,12 @@ Status ExternalMiniCluster::StartDistributedMasters() { scoped_refptr peer = new ExternalMaster(opts); if (opts_.enable_kerberos) { - RETURN_NOT_OK_PREPEND(peer->EnableKerberos(kdc_.get(), peer_hostports[i].host()), + RETURN_NOT_OK_PREPEND(peer->EnableKerberos(kdc_.get(), master_rpc_addrs[i].host()), "could not enable Kerberos"); } RETURN_NOT_OK_PREPEND(peer->Start(), Substitute("Unable to start Master at index $0", i)); - masters_.push_back(peer); + masters_.emplace_back(std::move(peer)); } return Status::OK(); @@ -425,10 +402,7 @@ Status ExternalMiniCluster::AddTabletServer() { int idx = tablet_servers_.size(); string daemon_id = Substitute("ts-$0", idx); - vector master_hostports; - for (int i = 0; i < num_masters(); i++) { - master_hostports.push_back(DCHECK_NOTNULL(master(i))->bound_rpc_hostport()); - } + vector master_hostports = master_rpc_addrs(); string bind_host = GetBindIpForTabletServer(idx); ExternalDaemonOptions opts; @@ -447,8 +421,7 @@ Status ExternalMiniCluster::AddTabletServer() { opts.rpc_bind_address = HostPort(bind_host, 0); opts.logtostderr = opts_.logtostderr; - scoped_refptr ts = - new ExternalTabletServer(opts, master_hostports); + scoped_refptr ts = new ExternalTabletServer(opts, master_hostports); if (opts_.enable_kerberos) { RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), bind_host), "could not enable Kerberos"); @@ -637,17 +610,11 @@ vector ExternalMiniCluster::daemons() const { } vector ExternalMiniCluster::master_rpc_addrs() const { - if (opts_.num_masters == 1) { - const auto& addr = CHECK_NOTNULL(master(0))->bound_rpc_addr(); - return { HostPort(addr.host(), addr.port()) }; - } - vector master_rpc_addrs; - for (int i = 0; i < opts_.master_rpc_ports.size(); i++) { - master_rpc_addrs.emplace_back( - GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode), - opts_.master_rpc_ports[i]); + vector master_hostports; + for (const auto& master : masters_) { + master_hostports.emplace_back(master->bound_rpc_hostport()); } - return master_rpc_addrs; + return master_hostports; } std::shared_ptr ExternalMiniCluster::messenger() const { http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/mini-cluster/external_mini_cluster.h ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h index a280640..238f8f9 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.h +++ b/src/kudu/mini-cluster/external_mini_cluster.h @@ -118,10 +118,10 @@ struct ExternalMiniClusterOptions { std::vector extra_tserver_flags; std::vector extra_master_flags; - // If more than one master is specified, list of ports for the - // masters in a consensus configuration. Port at index 0 is used for the leader - // master. - std::vector master_rpc_ports; + // List of RPC bind addresses to use for masters. + // + // If unset, addresses are assigned automatically. + std::vector master_rpc_addresses; // Options to configure the MiniKdc before starting it up. // Only used when 'enable_kerberos' is 'true'. @@ -273,10 +273,6 @@ class ExternalMiniCluster : public MiniCluster { return opts_.bind_mode; } - std::vector master_rpc_ports() const override { - return opts_.master_rpc_ports; - } - std::vector master_rpc_addrs() const override; std::shared_ptr messenger() const override; @@ -354,17 +350,15 @@ class ExternalMiniCluster : public MiniCluster { private: FRIEND_TEST(MasterFailoverTest, TestKillAnyMaster); - Status StartSingleMaster(); - - Status StartDistributedMasters(); + Status StartMasters(); Status DeduceBinRoot(std::string* ret); Status HandleOptions(); ExternalMiniClusterOptions opts_; - std::vector > masters_; - std::vector > tablet_servers_; + std::vector> masters_; + std::vector> tablet_servers_; std::unique_ptr kdc_; std::unique_ptr hms_; http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/mini-cluster/internal_mini_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc b/src/kudu/mini-cluster/internal_mini_cluster.cc index b9b3187..e272102 100644 --- a/src/kudu/mini-cluster/internal_mini_cluster.cc +++ b/src/kudu/mini-cluster/internal_mini_cluster.cc @@ -18,6 +18,7 @@ #include "kudu/mini-cluster/internal_mini_cluster.h" #include +#include #include #include #include @@ -41,11 +42,13 @@ #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" #include "kudu/util/path_util.h" #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" #include "kudu/util/test_util.h" +using std::unique_ptr; using std::string; using std::vector; using strings::Substitute; @@ -87,21 +90,11 @@ Status InternalMiniCluster::Start() { CHECK(!opts_.cluster_root.empty()) << "No cluster root was provided"; CHECK(!running_); - if (opts_.num_masters > 1) { - CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters); - } - if (!env_->FileExists(opts_.cluster_root)) { RETURN_NOT_OK(env_->CreateDir(opts_.cluster_root)); } - // start the masters - if (opts_.num_masters > 1) { - RETURN_NOT_OK_PREPEND(StartDistributedMasters(), - "Couldn't start distributed masters"); - } else { - RETURN_NOT_OK_PREPEND(StartSingleMaster(), "Couldn't start the single master"); - } + RETURN_NOT_OK_PREPEND(StartMasters(), "Couldn't start masters"); for (int i = 0; i < opts_.num_tablet_servers; i++) { RETURN_NOT_OK_PREPEND(AddTabletServer(), @@ -121,29 +114,63 @@ Status InternalMiniCluster::Start() { return Status::OK(); } -Status InternalMiniCluster::StartDistributedMasters() { - CHECK_GT(opts_.num_data_dirs, 0); - CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters); - CHECK_GT(opts_.master_rpc_ports.size(), 1); +Status InternalMiniCluster::StartMasters() { + int num_masters = opts_.num_masters; + + // Collect and keep alive the set of master sockets bound with SO_REUSEPORT + // until all master proccesses are started. This allows the mini-cluster to + // reserve a set of ports up front, then later start the set of masters, each + // configured with the full set of ports. + // + // TODO(dan): re-bind the ports between node restarts in order to prevent other + // processess from binding to them in the interim. + vector> reserved_sockets; + + if (mini_masters_.empty()) { + vector master_rpc_addrs; + for (int i = 0; i < num_masters; i++) { + unique_ptr reserved_socket; + RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(MiniCluster::MASTER, i, opts_.bind_mode, + &reserved_socket), + "failed to reserve master socket address"); + + Sockaddr addr; + RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr)); + + master_rpc_addrs.emplace_back(addr.host(), addr.port()); + reserved_sockets.emplace_back(std::move(reserved_socket)); + } - vector master_rpc_addrs = this->master_rpc_addrs(); - LOG(INFO) << "Creating distributed mini masters. Addrs: " - << HostPort::ToCommaSeparatedString(master_rpc_addrs); + LOG(INFO) << "Creating distributed mini masters. Addrs: " + << HostPort::ToCommaSeparatedString(master_rpc_addrs); - for (int i = 0; i < opts_.num_masters; i++) { - shared_ptr mini_master(new MiniMaster(GetMasterFsRoot(i), master_rpc_addrs[i])); - mini_master->SetMasterAddresses(master_rpc_addrs); - RETURN_NOT_OK_PREPEND(mini_master->Start(), Substitute("Couldn't start follower $0", i)); - VLOG(1) << "Started MiniMaster with UUID " << mini_master->permanent_uuid() + for (int i = 0; i < num_masters; i++) { + shared_ptr mini_master(new MiniMaster(GetMasterFsRoot(i), master_rpc_addrs[i])); + if (num_masters > 1) { + mini_master->SetMasterAddresses(master_rpc_addrs); + } + mini_masters_.emplace_back(std::move(mini_master)); + } + } + + CHECK_EQ(num_masters, mini_masters_.size()); + for (int i = 0; i < num_masters; i++) { + RETURN_NOT_OK_PREPEND(mini_masters_[i]->Start(), Substitute("failed to start master $0", i)); + VLOG(1) << "Started MiniMaster with UUID " << mini_masters_[i]->permanent_uuid() << " at index " << i; - mini_masters_.push_back(std::move(mini_master)); } - int i = 0; - for (const shared_ptr& master : mini_masters_) { - LOG(INFO) << "Waiting to initialize catalog manager on master " << i++; - RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(), + + for (int i = 0; i < num_masters; i++) { + LOG(INFO) << "Waiting to initialize catalog manager on master " << i; + RETURN_NOT_OK_PREPEND(mini_masters_[i]->WaitForCatalogManagerInit(), Substitute("Could not initialize catalog manager on master $0", i)); } + + if (num_masters == 1) { + RETURN_NOT_OK(mini_masters_[0]->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests( + MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds))); + } + return Status::OK(); } @@ -158,26 +185,6 @@ Status InternalMiniCluster::StartSync() { return Status::OK(); } -Status InternalMiniCluster::StartSingleMaster() { - CHECK_GT(opts_.num_data_dirs, 0); - CHECK_EQ(1, opts_.num_masters); - CHECK_LE(opts_.master_rpc_ports.size(), 1); - uint16_t master_rpc_port = 0; - if (opts_.master_rpc_ports.size() == 1) { - master_rpc_port = opts_.master_rpc_ports[0]; - } - - // start the master (we need the port to set on the servers). - string bind_ip = GetBindIpForDaemon(MiniCluster::MASTER, /*index=*/ 0, opts_.bind_mode); - shared_ptr mini_master(new MiniMaster(GetMasterFsRoot(0), - HostPort(std::move(bind_ip), master_rpc_port), opts_.num_data_dirs)); - RETURN_NOT_OK_PREPEND(mini_master->Start(), "Couldn't start master"); - RETURN_NOT_OK(mini_master->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests( - MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds))); - mini_masters_.push_back(std::move(mini_master)); - return Status::OK(); -} - Status InternalMiniCluster::AddTabletServer() { if (mini_masters_.empty()) { return Status::IllegalState("Master not yet initialized"); @@ -191,13 +198,11 @@ Status InternalMiniCluster::AddTabletServer() { string bind_ip = GetBindIpForDaemon(MiniCluster::TSERVER, new_idx, opts_.bind_mode); gscoped_ptr tablet_server(new MiniTabletServer(GetTabletServerFsRoot(new_idx), - HostPort(bind_ip, ts_rpc_port), opts_.num_data_dirs)); + HostPort(bind_ip, ts_rpc_port), + opts_.num_data_dirs)); // set the master addresses - tablet_server->options()->master_addresses.clear(); - for (const shared_ptr& master : mini_masters_) { - tablet_server->options()->master_addresses.emplace_back(master->bound_rpc_addr()); - } + tablet_server->options()->master_addresses = master_rpc_addrs(); RETURN_NOT_OK(tablet_server->Start()) mini_tablet_servers_.push_back(shared_ptr(tablet_server.release())); return Status::OK(); @@ -214,7 +219,6 @@ void InternalMiniCluster::ShutdownNodes(ClusterNodes nodes) { for (const shared_ptr& master_server : mini_masters_) { master_server->Shutdown(); } - mini_masters_.clear(); } running_ = false; } @@ -250,18 +254,12 @@ int InternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) co } vector InternalMiniCluster::master_rpc_addrs() const { - if (opts_.num_masters == 1) { - const auto& addr = CHECK_NOTNULL(mini_master(0))->bound_rpc_addr(); - return { HostPort(addr.host(), addr.port()) }; - } - - vector master_rpc_addrs; - for (int i = 0; i < opts_.master_rpc_ports.size(); i++) { - master_rpc_addrs.emplace_back( - GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode), - opts_.master_rpc_ports[i]); + vector master_hostports; + for (const auto& master : mini_masters_) { + Sockaddr add = master->bound_rpc_addr(); + master_hostports.emplace_back(add.host(), add.port()); } - return master_rpc_addrs; + return master_hostports; } string InternalMiniCluster::GetMasterFsRoot(int idx) const { @@ -336,7 +334,7 @@ Status InternalMiniCluster::WaitForTabletServerCount(int count, } Status InternalMiniCluster::CreateClient(KuduClientBuilder* builder, - client::sp::shared_ptr* client) const { + client::sp::shared_ptr* client) const { client::KuduClientBuilder defaults; if (builder == nullptr) { builder = &defaults; http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/mini-cluster/internal_mini_cluster.h ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h b/src/kudu/mini-cluster/internal_mini_cluster.h index eedac07..64b4e64 100644 --- a/src/kudu/mini-cluster/internal_mini_cluster.h +++ b/src/kudu/mini-cluster/internal_mini_cluster.h @@ -77,12 +77,6 @@ struct InternalMiniClusterOptions { MiniCluster::BindMode bind_mode; - // List of RPC ports for the master to run on. - // Defaults to an empty list. - // In single-master mode, an empty list implies port 0 (transient port). - // In multi-master mode, an empty list is illegal and will result in a CHECK failure. - std::vector master_rpc_ports; - // List of RPC ports for the tservers to run on. // Defaults to an empty list. // When adding a tablet server to the cluster via AddTabletServer(), if the @@ -109,14 +103,6 @@ class InternalMiniCluster : public MiniCluster { void ShutdownNodes(ClusterNodes nodes) override; - // Setup a consensus configuration of distributed masters, with count specified in - // 'options'. Requires that a reserve RPC port is specified in - // 'options' for each master. - Status StartDistributedMasters(); - - // Add a new standalone master to the cluster. The new master is started. - Status StartSingleMaster(); - // Add a new TS to the cluster. The new TS is started. // Requires that the master is already running. Status AddTabletServer(); @@ -167,10 +153,6 @@ class InternalMiniCluster : public MiniCluster { return opts_.bind_mode; } - std::vector master_rpc_ports() const override { - return opts_.master_rpc_ports; - } - std::vector master_rpc_addrs() const override; std::string GetMasterFsRoot(int idx) const; @@ -217,6 +199,10 @@ class InternalMiniCluster : public MiniCluster { std::shared_ptr master_proxy(int idx) const override; private: + + // Creates and starts the cluster masters. + Status StartMasters(); + enum { kRegistrationWaitTimeSeconds = 15, kMasterStartupWaitTimeSeconds = 30, @@ -228,8 +214,8 @@ class InternalMiniCluster : public MiniCluster { bool running_; - std::vector > mini_masters_; - std::vector > mini_tablet_servers_; + std::vector> mini_masters_; + std::vector> mini_tablet_servers_; std::shared_ptr messenger_; http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/mini-cluster/mini_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/mini_cluster.cc b/src/kudu/mini-cluster/mini_cluster.cc index b477e79..f67b432 100644 --- a/src/kudu/mini-cluster/mini_cluster.cc +++ b/src/kudu/mini-cluster/mini_cluster.cc @@ -22,12 +22,16 @@ #include #include #include +#include #include #include "kudu/gutil/strings/substitute.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" using std::string; +using std::unique_ptr; using strings::Substitute; namespace kudu { @@ -66,5 +70,22 @@ string MiniCluster::GetBindIpForDaemon(DaemonType type, int index, BindMode bind } } +Status MiniCluster::ReserveDaemonSocket(DaemonType type, + int index, + BindMode bind_mode, + unique_ptr* socket) { + string ip = GetBindIpForDaemon(type, index, bind_mode); + Sockaddr sock_addr; + RETURN_NOT_OK(sock_addr.ParseString(ip, 0)); + + unique_ptr sock(new Socket()); + RETURN_NOT_OK(sock->Init(0)); + RETURN_NOT_OK(sock->SetReuseAddr(true)); + RETURN_NOT_OK(sock->SetReusePort(true)); + RETURN_NOT_OK(sock->Bind(sock_addr)); + *socket = std::move(sock); + return Status::OK(); +} + } // namespace cluster } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/mini-cluster/mini_cluster.h ---------------------------------------------------------------------- diff --git a/src/kudu/mini-cluster/mini_cluster.h b/src/kudu/mini-cluster/mini_cluster.h index 845b79c..304fd81 100644 --- a/src/kudu/mini-cluster/mini_cluster.h +++ b/src/kudu/mini-cluster/mini_cluster.h @@ -16,7 +16,6 @@ // under the License. #pragma once -#include #include #include #include @@ -28,6 +27,7 @@ namespace kudu { class Env; class HostPort; +class Socket; namespace client { class KuduClient; @@ -106,7 +106,8 @@ class MiniCluster { static constexpr const char* const kLoopbackIpAddr = "127.0.0.1"; MiniCluster() {} - virtual ~MiniCluster() {} + + virtual ~MiniCluster() = default; // Start the cluster. virtual Status Start() = 0; @@ -126,8 +127,9 @@ class MiniCluster { virtual BindMode bind_mode() const = 0; - virtual std::vector master_rpc_ports() const = 0; - + /// Returns the RPC addresses of all Master nodes in the cluster. + /// + /// REQUIRES: the cluster must have already been Start()ed. virtual std::vector master_rpc_addrs() const = 0; // Create a client configured to talk to this cluster. 'builder' may contain @@ -160,6 +162,15 @@ class MiniCluster { // Returns the Env on which the cluster operates. virtual Env* env() const = 0; + /// Reserves a unique socket address for a mini-cluster daemon. The address + /// can be ascertained through the returned socket, and will remain reserved + /// for the life of the socket. The daemon must use the SO_REUSEPORT socket + /// option when binding to the address. + static Status ReserveDaemonSocket(DaemonType type, + int index, + BindMode bind_mode, + std::unique_ptr* socket); + protected: // Return the IP address that the daemon with the given index will bind to. // If bind_mode is LOOPBACK, this will be 127.0.0.1 and if it is WILDCARD it http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/tools/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt index ad47c8d..3ee1634 100644 --- a/src/kudu/tools/CMakeLists.txt +++ b/src/kudu/tools/CMakeLists.txt @@ -143,7 +143,7 @@ set(KUDU_TEST_LINK_LIBS mini_cluster ${KUDU_MIN_TEST_LIBS}) ADD_KUDU_TEST(ksck-test) -ADD_KUDU_TEST(ksck_remote-test RESOURCE_LOCK "master-rpc-ports" PROCESSORS 3) +ADD_KUDU_TEST(ksck_remote-test PROCESSORS 3) ADD_KUDU_TEST(kudu-admin-test PROCESSORS 3) ADD_KUDU_TEST_DEPENDENCIES(kudu-admin-test kudu) http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/tools/ksck_remote-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/ksck_remote-test.cc b/src/kudu/tools/ksck_remote-test.cc index e969f33..59ff953 100644 --- a/src/kudu/tools/ksck_remote-test.cc +++ b/src/kudu/tools/ksck_remote-test.cc @@ -90,12 +90,7 @@ class RemoteKsckTest : public KuduTest { InternalMiniClusterOptions opts; - // Hard-coded ports for the masters. This is safe, as these tests run under - // a resource lock (see CMakeLists.txt in this directory). - // TODO we should have a generic method to obtain n free ports. - opts.master_rpc_ports = { 11010, 11011, 11012 }; - - opts.num_masters = opts.master_rpc_ports.size(); + opts.num_masters = 3; opts.num_tablet_servers = 3; mini_cluster_.reset(new InternalMiniCluster(env_, opts)); ASSERT_OK(mini_cluster_->Start()); http://git-wip-us.apache.org/repos/asf/kudu/blob/eb4d88f0/src/kudu/tools/tool_action_test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc index ce0150c..a458b84 100644 --- a/src/kudu/tools/tool_action_test.cc +++ b/src/kudu/tools/tool_action_test.cc @@ -167,9 +167,6 @@ Status ProcessRequest(const ControlShellRequestPB& req, cc.extra_master_flags().end()); opts.extra_tserver_flags.assign(cc.extra_tserver_flags().begin(), cc.extra_tserver_flags().end()); - if (opts.num_masters > 1) { - opts.master_rpc_ports = { 11030, 11031, 11032 }; - } if (opts.enable_kerberos) { opts.mini_kdc_options.data_root = JoinPathSegments(opts.cluster_root, "krb5kdc"); opts.mini_kdc_options.ticket_lifetime = cc.mini_kdc_options().ticket_lifetime();