kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 01/03: [sentry] Integrate AuthzProvider into CatalogManager
Date Tue, 26 Mar 2019 05:09:55 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit bc63454ee7fabf46298dfd5801a8ecd5d1bab787
Author: Hao Hao <hao.hao@cloudera.com>
AuthorDate: Sun Feb 3 20:40:44 2019 -0800

    [sentry] Integrate AuthzProvider into CatalogManager
    
    This commit enables master RPC authorization enforcement by connecting
    the CatalogManager to the Sentry service via the SentryAuthzProvider.
    When the Sentry integration is enabled (by setting the
    --sentry_service_rpc_addresses flag), DDLs such as table creation,
    alteration, deletion are validated to see if the connected user has
    the permission to perform such operations. Note that the coarse-grained
    access control is still applied to these endpoints. A --trusted_user_acl
    flag is introduced to allow the trusted user, e.g. 'impala', to skip the
    authorization enforcement.
    
    Testing: This commit adds a new integration test (master_sentry-itest)
    which tests that the integration works as expected with all exposed
    table operations. More coverage on DDL stress tests with Sentry
    integration enabled will be in a follow up patch.
    
    Change-Id: Iab4aa027ae6eb4520db48ce348db552c9feec2a8
    Reviewed-on: http://gerrit.cloudera.org:8080/11797
    Tested-by: Hao Hao <hao.hao@cloudera.com>
    Reviewed-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/client/client-test.cc                     |   3 +-
 src/kudu/common/table_util-test.cc                 |   1 +
 src/kudu/integration-tests/CMakeLists.txt          |   3 +-
 src/kudu/integration-tests/alter_table-test.cc     |   2 +-
 src/kudu/integration-tests/cluster_itest_util.cc   |   5 +
 src/kudu/integration-tests/cluster_itest_util.h    |   4 +-
 src/kudu/integration-tests/consistency-itest.cc    |   4 +-
 .../integration-tests/create-table-stress-test.cc  |  12 +-
 src/kudu/integration-tests/delete_table-itest.cc   |   4 +-
 .../integration-tests/flex_partitioning-itest.cc   |   5 +-
 src/kudu/integration-tests/hms_itest-base.cc       | 177 ++++++
 src/kudu/integration-tests/hms_itest-base.h        |  68 ++
 src/kudu/integration-tests/master_hms-itest.cc     | 218 ++-----
 src/kudu/integration-tests/master_sentry-itest.cc  | 681 +++++++++++++++++++--
 .../integration-tests/raft_config_change-itest.cc  |   7 +-
 .../raft_consensus_nonvoter-itest.cc               |   8 +-
 src/kudu/integration-tests/registration-test.cc    |   6 +-
 src/kudu/integration-tests/tablet_copy-itest.cc    |   5 +-
 .../integration-tests/tombstoned_voting-itest.cc   |   5 +-
 src/kudu/master/CMakeLists.txt                     |   1 +
 src/kudu/master/authz_provider.cc                  |  53 ++
 src/kudu/master/authz_provider.h                   |  10 +
 src/kudu/master/catalog_manager.cc                 | 286 +++++++--
 src/kudu/master/catalog_manager.h                  | 120 ++--
 src/kudu/master/master-test-util.h                 |   3 +-
 src/kudu/master/master.proto                       |   3 +
 src/kudu/master/master_service.cc                  |  19 +-
 src/kudu/master/sentry_authz_provider-test-base.h  | 134 ++++
 src/kudu/master/sentry_authz_provider-test.cc      | 166 ++---
 src/kudu/master/sentry_authz_provider.cc           |   4 +
 src/kudu/master/sentry_authz_provider.h            |   5 +-
 src/kudu/sentry/mini_sentry.cc                     |  11 +
 src/kudu/tools/rebalancer_tool-test.cc             |   3 +
 33 files changed, 1552 insertions(+), 484 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 7404f4a..44d6f2a 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -154,6 +154,7 @@ using base::subtle::Atomic32;
 using base::subtle::NoBarrier_AtomicIncrement;
 using base::subtle::NoBarrier_Load;
 using base::subtle::NoBarrier_Store;
+using boost::none;
 using google::protobuf::util::MessageDifferencer;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
@@ -300,7 +301,7 @@ class ClientTest : public KuduTest {
         cluster_->mini_master()->master()->catalog_manager();
     CatalogManager::ScopedLeaderSharedLock l(catalog);
     CHECK_OK(l.first_failed_status());
-    CHECK_OK(catalog->GetTableLocations(&req, &resp));
+    CHECK_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none));
     CHECK(resp.tablet_locations_size() > 0);
     return resp.tablet_locations(0).tablet_id();
   }
diff --git a/src/kudu/common/table_util-test.cc b/src/kudu/common/table_util-test.cc
index 01344e3..b351ebe 100644
--- a/src/kudu/common/table_util-test.cc
+++ b/src/kudu/common/table_util-test.cc
@@ -49,6 +49,7 @@ TEST(TestTableUtil, TestParseHiveTableIdentifier) {
   EXPECT_EQ("_leading_underscore", db);
   EXPECT_EQ("trailing_underscore_", tbl);
 
+  EXPECT_TRUE(ParseHiveTableIdentifier("", &db, &tbl).IsInvalidArgument());
   EXPECT_TRUE(ParseHiveTableIdentifier(".", &db, &tbl).IsInvalidArgument());
   EXPECT_TRUE(ParseHiveTableIdentifier("no-table", &db, &tbl).IsInvalidArgument());
   EXPECT_TRUE(ParseHiveTableIdentifier("lots.of.tables", &db, &tbl).IsInvalidArgument());
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 18d7db4..fcd0b74 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -24,6 +24,7 @@ set(INTEGRATION_TESTS_SRCS
   cluster_itest_util.cc
   cluster_verifier.cc
   external_mini_cluster-itest-base.cc
+  hms_itest-base.cc
   internal_mini_cluster-itest-base.cc
   log_verifier.cc
   mini_cluster_fs_inspector.cc
@@ -89,7 +90,7 @@ ADD_KUDU_TEST(master_migration-itest)
 ADD_KUDU_TEST_DEPENDENCIES(master_migration-itest
   kudu)
 ADD_KUDU_TEST(master_replication-itest)
-ADD_KUDU_TEST(master_sentry-itest RUN_SERIAL true PROCESSORS 4)
+ADD_KUDU_TEST(master_sentry-itest RUN_SERIAL true NUM_SHARDS 8 PROCESSORS 4)
 ADD_KUDU_TEST(master-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(multidir_cluster-itest)
 ADD_KUDU_TEST(open-readonly-fs-itest PROCESSORS 4)
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 7ffde6c..79b7396 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -349,7 +349,7 @@ TEST_F(AlterTableTest, TestAddNotNullableColumnWithoutDefaults) {
         cluster_->mini_master()->master()->catalog_manager();
     master::CatalogManager::ScopedLeaderSharedLock l(catalog);
     ASSERT_OK(l.first_failed_status());
-    Status s = catalog->AlterTableRpc(req, &resp, nullptr);
+    Status s = catalog->AlterTableRpc(req, &resp, /*rpc=*/nullptr);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "column `c2`: NOT NULL columns must have a default");
   }
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 9dae097..49a22cb 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -65,6 +65,7 @@
 namespace kudu {
 namespace itest {
 
+using boost::optional;
 using client::KuduSchema;
 using client::KuduSchemaBuilder;
 using consensus::BulkChangeConfigRequestPB;
@@ -921,9 +922,13 @@ Status GetTableLocations(const shared_ptr<MasterServiceProxy>& master_proxy,
                          const string& table_name,
                          const MonoDelta& timeout,
                          master::ReplicaTypeFilter filter,
+                         optional<const string&> table_id,
                          master::GetTableLocationsResponsePB* table_locations) {
   master::GetTableLocationsRequestPB req;
   req.mutable_table()->set_table_name(table_name);
+  if (table_id) {
+    req.mutable_table()->set_table_id(*table_id);
+  }
   req.set_replica_type_filter(filter);
   req.set_max_returned_locations(1000);
   rpc::RpcController rpc;
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index f9a073e..fba9fae 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -342,11 +342,13 @@ Status GetTabletLocations(const std::shared_ptr<master::MasterServiceProxy>& mas
                           master::ReplicaTypeFilter filter,
                           master::TabletLocationsPB* tablet_locations);
 
-// Get the list of tablet locations for all tablets in the specified table from the Master.
+// Get the list of tablet locations for all tablets in the specified table via the given
+// table name (and table ID if provided) from the Master.
 Status GetTableLocations(const std::shared_ptr<master::MasterServiceProxy>& master_proxy,
                          const std::string& table_name,
                          const MonoDelta& timeout,
                          master::ReplicaTypeFilter filter,
+                         boost::optional<const std::string&> table_id,
                          master::GetTableLocationsResponsePB* table_locations);
 
 // Wait for the specified number of voters to be reported to the config on the
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index 81231cd..d6b1ca0 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -21,6 +21,7 @@
 #include <string>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -68,6 +69,7 @@ DECLARE_int32(max_clock_sync_error_usec);
 DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_string(time_source);
 
+using boost::none;
 using kudu::client::ScanConfiguration;
 using kudu::client::sp::shared_ptr;
 using kudu::master::CatalogManager;
@@ -246,7 +248,7 @@ class ConsistencyITest : public MiniClusterITestBase {
     GetTableLocationsResponsePB resp;
     CatalogManager::ScopedLeaderSharedLock l(catalog);
     RETURN_NOT_OK(l.first_failed_status());
-    RETURN_NOT_OK(catalog->GetTableLocations(&req, &resp));
+    RETURN_NOT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none));
     if (resp.tablet_locations_size() < 1) {
       return Status::NotFound(Substitute("$0: no tablets for key", key_value));
     }
diff --git a/src/kudu/integration-tests/create-table-stress-test.cc b/src/kudu/integration-tests/create-table-stress-test.cc
index 4bd3787..f0b71ee 100644
--- a/src/kudu/integration-tests/create-table-stress-test.cc
+++ b/src/kudu/integration-tests/create-table-stress-test.cc
@@ -24,6 +24,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -59,6 +60,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using boost::none;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduColumnSchema;
@@ -265,7 +267,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(0);
-    Status s = catalog->GetTableLocations(&req, &resp);
+    Status s = catalog->GetTableLocations(&req, &resp, /*user=*/none);
     ASSERT_STR_CONTAINS(s.ToString(), "must be greater than 0");
   }
 
@@ -276,7 +278,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(1);
-    ASSERT_OK(catalog->GetTableLocations(&req, &resp));
+    ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none));
     ASSERT_EQ(resp.tablet_locations_size(), 1);
     // empty since it's the first
     ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_start(), "");
@@ -291,7 +293,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(half_tablets);
-    ASSERT_OK(catalog->GetTableLocations(&req, &resp));
+    ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none));
     ASSERT_EQ(half_tablets, resp.tablet_locations_size());
   }
 
@@ -302,7 +304,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(FLAGS_num_test_tablets);
-    ASSERT_OK(catalog->GetTableLocations(&req, &resp));
+    ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none));
     ASSERT_EQ(FLAGS_num_test_tablets, resp.tablet_locations_size());
   }
 
@@ -346,7 +348,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(1);
     req.set_partition_key_start(start_key_middle);
-    ASSERT_OK(catalog->GetTableLocations(&req, &resp));
+    ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/none));
     ASSERT_EQ(1, resp.tablet_locations_size())
         << "Response: [" << pb_util::SecureDebugString(resp) << "]";
     ASSERT_EQ(start_key_middle, resp.tablet_locations(0).partition().partition_key_start());
diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc
index 6108f7a..ea48ff9 100644
--- a/src/kudu/integration-tests/delete_table-itest.cc
+++ b/src/kudu/integration-tests/delete_table-itest.cc
@@ -77,6 +77,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using boost::none;
 using kudu::client::KuduClient;
 using kudu::client::KuduScanner;
 using kudu::client::KuduScanBatch;
@@ -1175,7 +1176,8 @@ TEST_F(DeleteTableITest, TestNoDeleteTombstonedTablets) {
   ASSERT_OK(inspect_->WaitForReplicaCount(kNumReplicas));
   master::GetTableLocationsResponsePB table_locations;
   ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
-                                     kTimeout, master::VOTER_REPLICA, &table_locations));
+                                     kTimeout, master::VOTER_REPLICA, /*table_id=*/none,
+                                     &table_locations));
   ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
   string tablet_id;
   std::set<string> replicas;
diff --git a/src/kudu/integration-tests/flex_partitioning-itest.cc b/src/kudu/integration-tests/flex_partitioning-itest.cc
index b353e90..edef835 100644
--- a/src/kudu/integration-tests/flex_partitioning-itest.cc
+++ b/src/kudu/integration-tests/flex_partitioning-itest.cc
@@ -27,6 +27,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -56,8 +57,8 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using boost::none;
 using kudu::client::KuduClient;
-using kudu::client::KuduClientBuilder;
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduInsert;
 using kudu::client::KuduPredicate;
@@ -432,6 +433,7 @@ void FlexPartitioningITest::CheckPartitionKeyRangeScan() {
                               table_->name(),
                               MonoDelta::FromSeconds(32),
                               master::VOTER_REPLICA,
+                              /*table_id=*/none,
                               &table_locations));
 
   vector<string> rows;
@@ -466,6 +468,7 @@ void FlexPartitioningITest::CheckPartitionKeyRangeScanWithPKRange(int lower, int
                               table_->name(),
                               MonoDelta::FromSeconds(32),
                               master::VOTER_REPLICA,
+                              /*table_id=*/none,
                               &table_locations));
   vector<string> rows;
 
diff --git a/src/kudu/integration-tests/hms_itest-base.cc b/src/kudu/integration-tests/hms_itest-base.cc
new file mode 100644
index 0000000..0369792
--- /dev/null
+++ b/src/kudu/integration-tests/hms_itest-base.cc
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/integration-tests/hms_itest-base.h"
+
+#include <map>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/hms/hive_metastore_types.h"
+#include "kudu/hms/hms_client.h"
+#include "kudu/hms/mini_hms.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/util/decimal_util.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/user.h"
+
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using kudu::hms::HmsClient;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+
+Status HmsITestBase::StopHms() {
+  RETURN_NOT_OK(hms_client_->Stop());
+  return cluster_->hms()->Stop();
+}
+
+Status HmsITestBase::StartHms() {
+  RETURN_NOT_OK(cluster_->hms()->Start());
+  return hms_client_->Start();
+}
+
+Status HmsITestBase::CreateDatabase(const string& database_name) {
+  hive::Database db;
+  db.name = database_name;
+  RETURN_NOT_OK(hms_client_->CreateDatabase(db));
+  // Sanity check that the DB is created.
+  RETURN_NOT_OK(hms_client_->GetDatabase(database_name, &db));
+  return Status::OK();
+}
+
+Status HmsITestBase::CreateKuduTable(const string& database_name,
+                                     const string& table_name) {
+  // Get coverage of all column types.
+  KuduSchema schema;
+  KuduSchemaBuilder b;
+  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+  b.AddColumn("int8_val")->Type(KuduColumnSchema::INT8);
+  b.AddColumn("int16_val")->Type(KuduColumnSchema::INT16);
+  b.AddColumn("int32_val")->Type(KuduColumnSchema::INT32);
+  b.AddColumn("int64_val")->Type(KuduColumnSchema::INT64);
+  b.AddColumn("timestamp_val")->Type(KuduColumnSchema::UNIXTIME_MICROS);
+  b.AddColumn("string_val")->Type(KuduColumnSchema::STRING);
+  b.AddColumn("bool_val")->Type(KuduColumnSchema::BOOL);
+  b.AddColumn("float_val")->Type(KuduColumnSchema::FLOAT);
+  b.AddColumn("double_val")->Type(KuduColumnSchema::DOUBLE);
+  b.AddColumn("binary_val")->Type(KuduColumnSchema::BINARY);
+  b.AddColumn("decimal32_val")->Type(KuduColumnSchema::DECIMAL)
+                              ->Precision(kMaxDecimal32Precision);
+  b.AddColumn("decimal64_val")->Type(KuduColumnSchema::DECIMAL)
+                              ->Precision(kMaxDecimal64Precision);
+  b.AddColumn("decimal128_val")->Type(KuduColumnSchema::DECIMAL)
+                               ->Precision(kMaxDecimal128Precision);
+
+  RETURN_NOT_OK(b.Build(&schema));
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  return table_creator->table_name(Substitute("$0.$1", database_name, table_name))
+                       .schema(&schema)
+                       .num_replicas(1)
+                       .set_range_partition_columns({ "key" })
+                       .Create();
+}
+
+Status HmsITestBase::RenameHmsTable(const string& database_name,
+                                    const string& old_table_name,
+                                    const string& new_table_name) {
+  // The HMS doesn't have a rename table API. Instead it offers the more
+  // general AlterTable API, which requires the entire set of table fields to be
+  // set. Since we don't know these fields during a simple rename operation, we
+  // have to look them up.
+  hive::Table table;
+  RETURN_NOT_OK(hms_client_->GetTable(database_name, old_table_name, &table));
+  table.tableName = new_table_name;
+  return hms_client_->AlterTable(database_name, old_table_name, table);
+}
+
+Status HmsITestBase::AlterHmsTableDropColumns(const string& database_name,
+                                              const string& table_name) {
+    hive::Table table;
+    RETURN_NOT_OK(hms_client_->GetTable(database_name, table_name, &table));
+    table.sd.cols.clear();
+
+    // The KuduMetastorePlugin only allows the master to alter the columns in a
+    // Kudu table, so we pretend to be the master.
+    hive::EnvironmentContext env_ctx;
+    env_ctx.__set_properties({ std::make_pair(hms::HmsClient::kKuduMasterEventKey, "true") });
+    RETURN_NOT_OK(hms_client_->AlterTable(database_name, table_name, table, env_ctx));
+    return Status::OK();
+}
+
+void HmsITestBase::CheckTable(const string& database_name,
+                              const string& table_name,
+                              boost::optional<const string&> user) {
+  SCOPED_TRACE(Substitute("Checking table $0.$1", database_name, table_name));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(Substitute("$0.$1", database_name, table_name), &table));
+
+  hive::Table hms_table;
+  ASSERT_OK(hms_client_->GetTable(database_name, table_name, &hms_table));
+
+  string username;
+  if (user) {
+    username = *user;
+  } else {
+    ASSERT_OK(GetLoggedInUser(&username));
+  }
+  ASSERT_EQ(hms_table.owner, username);
+
+  const auto& schema = table->schema();
+  ASSERT_EQ(schema.num_columns(), hms_table.sd.cols.size());
+  for (int idx = 0; idx < schema.num_columns(); idx++) {
+    ASSERT_EQ(schema.Column(idx).name(), hms_table.sd.cols[idx].name);
+  }
+  ASSERT_EQ(table->id(), hms_table.parameters[hms::HmsClient::kKuduTableIdKey]);
+  ASSERT_EQ(HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs()),
+            hms_table.parameters[hms::HmsClient::kKuduMasterAddrsKey]);
+  ASSERT_EQ(hms::HmsClient::kKuduStorageHandler,
+            hms_table.parameters[hms::HmsClient::kStorageHandlerKey]);
+}
+
+void HmsITestBase::CheckTableDoesNotExist(const string& database_name,
+                                          const string& table_name) {
+  SCOPED_TRACE(Substitute("Checking table $0.$1 does not exist", database_name, table_name));
+
+  shared_ptr<KuduTable> table;
+  Status s = client_->OpenTable(Substitute("$0.$1", database_name, table_name), &table);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+
+  hive::Table hms_table;
+  s = hms_client_->GetTable(database_name, table_name, &hms_table);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+}
+
+} // namespace kudu
diff --git a/src/kudu/integration-tests/hms_itest-base.h b/src/kudu/integration-tests/hms_itest-base.h
new file mode 100644
index 0000000..57f1a3f
--- /dev/null
+++ b/src/kudu/integration-tests/hms_itest-base.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include <boost/optional/optional.hpp>
+
+#include "kudu/hms/hms_client.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class HmsITestBase : public ExternalMiniClusterITestBase {
+ public:
+  Status StopHms();
+  Status StartHms();
+
+  // Creates a database in the HMS catalog.
+  Status CreateDatabase(const std::string& database_name);
+
+  // Creates a table in Kudu.
+  Status CreateKuduTable(const std::string& database_name,
+                         const std::string& table_name);
+
+  // Renames a table entry in the HMS catalog.
+  Status RenameHmsTable(const std::string& database_name,
+                        const std::string& old_table_name,
+                        const std::string& new_table_name);
+
+  // Drops all columns from a Kudu HMS table entry.
+  Status AlterHmsTableDropColumns(const std::string& database_name,
+                                  const std::string& table_name);
+
+  // Checks that the Kudu table schema and the HMS table entry in their
+  // respective catalogs are synchronized for a particular table. It also
+  // verifies that the table owner is the given user (if not provided,
+  // checks against the logged in user).
+  void CheckTable(const std::string& database_name,
+                  const std::string& table_name,
+                  boost::optional<const std::string&> user);
+
+  // Checks that a table does not exist in the Kudu and HMS catalogs.
+  void CheckTableDoesNotExist(const std::string& database_name,
+                              const std::string& table_name);
+
+ protected:
+  std::unique_ptr<hms::HmsClient> hms_client_;
+};
+
+} // namespace kudu
diff --git a/src/kudu/integration-tests/master_hms-itest.cc b/src/kudu/integration-tests/master_hms-itest.cc
index 6ea87a9..f539c08 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -16,18 +16,15 @@
 // under the License.
 
 #include <algorithm>
-#include <map>
 #include <memory>
 #include <string>
-#include <type_traits>
-#include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/client/client.h"
-#include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/table_util.h"
@@ -35,40 +32,34 @@
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
 #include "kudu/hms/mini_hms.h"
-#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/hms_itest-base.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/security/test/mini_kdc.h"
 #include "kudu/thrift/client.h"
-#include "kudu/util/decimal_util.h"
-#include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
-#include "kudu/util/user.h"
 
-namespace kudu {
-
-using client::KuduColumnSchema;
-using client::KuduSchema;
-using client::KuduSchemaBuilder;
-using client::KuduTable;
-using client::KuduTableAlterer;
-using client::KuduTableCreator;
-using client::sp::shared_ptr;
-using cluster::ExternalMiniClusterOptions;
-using hms::HmsClient;
+using boost::none;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableAlterer;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::hms::HmsClient;
 using std::string;
 using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+namespace kudu {
+
 // Test Master <-> HMS catalog synchronization.
-class MasterHmsTest : public ExternalMiniClusterITestBase {
+class MasterHmsTest : public HmsITestBase {
  public:
 
   void SetUp() override {
-    ExternalMiniClusterITestBase::SetUp();
+    HmsITestBase::SetUp();
 
     ExternalMiniClusterOptions opts;
     opts.hms_mode = GetHmsMode();
@@ -87,139 +78,12 @@ class MasterHmsTest : public ExternalMiniClusterITestBase {
   }
 
   void TearDown() override {
-    ASSERT_OK(hms_client_->Stop());
-    ExternalMiniClusterITestBase::TearDown();
-  }
-
-  Status StopHms() {
-    RETURN_NOT_OK(hms_client_->Stop());
-    RETURN_NOT_OK(cluster_->hms()->Stop());
-    return Status::OK();
-  }
-
-  Status StartHms() {
-    RETURN_NOT_OK(cluster_->hms()->Start());
-    RETURN_NOT_OK(hms_client_->Start());
-    return Status::OK();
-  }
-
-  Status CreateDatabase(const string& database_name) {
-    hive::Database db;
-    db.name = database_name;
-    RETURN_NOT_OK(hms_client_->CreateDatabase(db));
-    // Sanity check that the DB is created.
-    RETURN_NOT_OK(hms_client_->GetDatabase(database_name, &db));
-    return Status::OK();
-  }
-
-  Status CreateKuduTable(const string& database_name, const string& table_name) {
-    // Get coverage of all column types.
-    KuduSchema schema;
-    KuduSchemaBuilder b;
-    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
-    b.AddColumn("int8_val")->Type(KuduColumnSchema::INT8);
-    b.AddColumn("int16_val")->Type(KuduColumnSchema::INT16);
-    b.AddColumn("int32_val")->Type(KuduColumnSchema::INT32);
-    b.AddColumn("int64_val")->Type(KuduColumnSchema::INT64);
-    b.AddColumn("timestamp_val")->Type(KuduColumnSchema::UNIXTIME_MICROS);
-    b.AddColumn("string_val")->Type(KuduColumnSchema::STRING);
-    b.AddColumn("bool_val")->Type(KuduColumnSchema::BOOL);
-    b.AddColumn("float_val")->Type(KuduColumnSchema::FLOAT);
-    b.AddColumn("double_val")->Type(KuduColumnSchema::DOUBLE);
-    b.AddColumn("binary_val")->Type(KuduColumnSchema::BINARY);
-    b.AddColumn("decimal32_val")->Type(KuduColumnSchema::DECIMAL)
-        ->Precision(kMaxDecimal32Precision);
-    b.AddColumn("decimal64_val")->Type(KuduColumnSchema::DECIMAL)
-        ->Precision(kMaxDecimal64Precision);
-    b.AddColumn("decimal128_val")->Type(KuduColumnSchema::DECIMAL)
-        ->Precision(kMaxDecimal128Precision);
-
-    RETURN_NOT_OK(b.Build(&schema));
-    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
-    return table_creator->table_name(Substitute("$0.$1", database_name, table_name))
-                         .schema(&schema)
-                         .num_replicas(1)
-                         .set_range_partition_columns({ "key" })
-                         .Create();
-  }
-
-  // Rename a table entry in the HMS catalog.
-  Status RenameHmsTable(const string& database_name,
-                        const string& old_table_name,
-                        const string& new_table_name) {
-    // The HMS doesn't have a rename table API. Instead it offers the more
-    // general AlterTable API, which requires the entire set of table fields to be
-    // set. Since we don't know these fields during a simple rename operation, we
-    // have to look them up.
-    hive::Table table;
-    RETURN_NOT_OK(hms_client_->GetTable(database_name, old_table_name, &table));
-    table.tableName = new_table_name;
-    return hms_client_->AlterTable(database_name, old_table_name, table);
-  }
-
-  // Drop all columns from a Kudu HMS table entry.
-  Status AlterHmsTableDropColumns(const string& database_name, const string& table_name) {
-    hive::Table table;
-    RETURN_NOT_OK(hms_client_->GetTable(database_name, table_name, &table));
-    table.sd.cols.clear();
-
-    // The KuduMetastorePlugin only allows the master to alter the columns in a
-    // Kudu table, so we pretend to be the master.
-    hive::EnvironmentContext env_ctx;
-    env_ctx.__set_properties({ std::make_pair(hms::HmsClient::kKuduMasterEventKey, "true") });
-    RETURN_NOT_OK(hms_client_->AlterTable(database_name, table_name, table, env_ctx));
-    return Status::OK();
-  }
-
-  // Checks that the Kudu table schema and the HMS table entry in their
-  // respective catalogs are synchronized for a particular table.
-  void CheckTable(const string& database_name, const string& table_name) {
-    SCOPED_TRACE(Substitute("Checking table $0.$1", database_name, table_name));
-    shared_ptr<KuduTable> table;
-    ASSERT_OK(client_->OpenTable(Substitute("$0.$1", database_name, table_name), &table));
-    KuduSchema schema = table->schema();
-
-    hive::Table hms_table;
-    ASSERT_OK(hms_client_->GetTable(database_name, table_name, &hms_table));
-
-    string username;
-    ASSERT_OK(GetLoggedInUser(&username));
-
-    ASSERT_EQ(hms_table.owner, username);
-    ASSERT_EQ(schema.num_columns(), hms_table.sd.cols.size());
-    for (int idx = 0; idx < schema.num_columns(); idx++) {
-      ASSERT_EQ(schema.Column(idx).name(), hms_table.sd.cols[idx].name);
+    if (hms_client_) {
+      ASSERT_OK(hms_client_->Stop());
     }
-    ASSERT_EQ(table->id(), hms_table.parameters[hms::HmsClient::kKuduTableIdKey]);
-    ASSERT_EQ(HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs()),
-              hms_table.parameters[hms::HmsClient::kKuduMasterAddrsKey]);
-    ASSERT_EQ(hms::HmsClient::kKuduStorageHandler,
-              hms_table.parameters[hms::HmsClient::kStorageHandlerKey]);
+    HmsITestBase::TearDown();
   }
 
-  // Checks that a table does not exist in the Kudu and HMS catalogs.
-  void CheckTableDoesNotExist(const string& database_name, const string& table_name) {
-    SCOPED_TRACE(Substitute("Checking table $0.$1 does not exist", database_name, table_name));
-
-    shared_ptr<KuduTable> table;
-    Status s = client_->OpenTable(Substitute("$0.$1", database_name, table_name), &table);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
-
-    hive::Table hms_table;
-    s = hms_client_->GetTable(database_name, table_name, &hms_table);
-    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
-  }
-
-  static hive::EnvironmentContext MasterEnvCtx() {
-    hive::EnvironmentContext env_ctx;
-    env_ctx.__set_properties({ std::make_pair(hms::HmsClient::kKuduMasterEventKey, "true") });
-    return env_ctx;
-  }
-
- protected:
-
-  unique_ptr<HmsClient> hms_client_;
-
  private:
 
   virtual HmsMode GetHmsMode() {
@@ -261,7 +125,7 @@ TEST_F(MasterHmsTest, TestCreateTable) {
   // Drop the HMS entry and create the table through Kudu.
   ASSERT_OK(hms_client_->DropTable(hms_database_name, hms_table_name));
   ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
-  NO_FATALS(CheckTable(hms_database_name, hms_table_name));
+  NO_FATALS(CheckTable(hms_database_name, hms_table_name, /*user=*/none));
 
   // Shutdown the HMS and try to create a table.
   ASSERT_OK(StopHms());
@@ -275,14 +139,14 @@ TEST_F(MasterHmsTest, TestCreateTable) {
     // HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
     ASSERT_OK(CreateKuduTable(hms_database_name, "foo"));
   });
-  NO_FATALS(CheckTable(hms_database_name, "foo"));
+  NO_FATALS(CheckTable(hms_database_name, "foo", /*user=*/none));
 }
 
 TEST_F(MasterHmsTest, TestRenameTable) {
   // Create the database and Kudu table.
   ASSERT_OK(CreateDatabase("db"));
   ASSERT_OK(CreateKuduTable("db", "a"));
-  NO_FATALS(CheckTable("db", "a"));
+  NO_FATALS(CheckTable("db", "a", /*user=*/none));
 
   // Create a non-Kudu ('external') HMS table entry.
   hive::Table external_table;
@@ -332,13 +196,13 @@ TEST_F(MasterHmsTest, TestRenameTable) {
     // HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
     ASSERT_OK(table_alterer->Alter());
   });
-  NO_FATALS(CheckTable("db", "c"));
+  NO_FATALS(CheckTable("db", "c", /*user=*/ none));
   NO_FATALS(CheckTableDoesNotExist("db", "a"));
 
   // Rename the table through the HMS, and ensure the rename is handled in Kudu.
   ASSERT_OK(RenameHmsTable("db", "c", "d"));
   ASSERT_EVENTUALLY([&] {
-    NO_FATALS(CheckTable("db", "d"));
+    NO_FATALS(CheckTable("db", "d", /*user=*/ none));
   });
 
   // Check that the two tables still exist.
@@ -355,10 +219,10 @@ TEST_F(MasterHmsTest, TestRenameTable) {
   ASSERT_OK(CreateDatabase("db1"));
   ASSERT_OK(CreateDatabase("db2"));
   ASSERT_OK(CreateKuduTable("db1", "t1"));
-  NO_FATALS(CheckTable("db1", "t1"));
+  NO_FATALS(CheckTable("db1", "t1", /*user=*/ none));
   table_alterer.reset(client_->NewTableAlterer("db1.t1"));
   ASSERT_OK(table_alterer->RenameTo("db2.t2")->Alter());
-  NO_FATALS(CheckTable("db2", "t2"));
+  NO_FATALS(CheckTable("db2", "t2", /*user=*/ none));
   NO_FATALS(CheckTableDoesNotExist("db1", "t1"));
   NO_FATALS(CheckTableDoesNotExist("db1", "t2"));
 }
@@ -366,7 +230,7 @@ TEST_F(MasterHmsTest, TestRenameTable) {
 TEST_F(MasterHmsTest, TestAlterTable) {
   // Create the Kudu table.
   ASSERT_OK(CreateKuduTable("default", "a"));
-  NO_FATALS(CheckTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a", /*user=*/ none));
 
   // Alter the HMS table entry in a destructive way (remove all columns).
   ASSERT_OK(AlterHmsTableDropColumns("default", "a"));
@@ -374,7 +238,7 @@ TEST_F(MasterHmsTest, TestAlterTable) {
   // Drop a column in Kudu. This should correct the entire set of columns in the HMS.
   unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer("default.a"));
   ASSERT_OK(table_alterer->DropColumn("int8_val")->Alter());
-  NO_FATALS(CheckTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a", /*user=*/ none));
 
   // Shutdown the HMS and try to alter the table.
   ASSERT_OK(StopHms());
@@ -388,7 +252,7 @@ TEST_F(MasterHmsTest, TestAlterTable) {
     // HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
     ASSERT_OK(table_alterer->Alter());
   });
-  NO_FATALS(CheckTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a", /*user=*/ none));
 
   // Only alter the table in Kudu, the corresponding table in the HMS will not be altered.
   table_alterer.reset(client_->NewTableAlterer("default.a")->RenameTo("default.b")
@@ -404,7 +268,7 @@ TEST_F(MasterHmsTest, TestAlterTable) {
 TEST_F(MasterHmsTest, TestDeleteTable) {
   // Create a Kudu table, then drop it from Kudu and ensure the HMS entry is removed.
   ASSERT_OK(CreateKuduTable("default", "a"));
-  NO_FATALS(CheckTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a", /*user=*/ none));
   hive::Table hms_table;
   ASSERT_OK(hms_client_->GetTable("default", "a", &hms_table));
 
@@ -413,7 +277,7 @@ TEST_F(MasterHmsTest, TestDeleteTable) {
 
   // Create the Kudu table, then drop it from the HMS, and ensure the Kudu table is deleted.
   ASSERT_OK(CreateKuduTable("default", "b"));
-  NO_FATALS(CheckTable("default", "b"));
+  NO_FATALS(CheckTable("default", "b", /*user=*/ none));
   hive::Table hms_table_b;
   ASSERT_OK(hms_client_->GetTable("default", "b", &hms_table_b));
   shared_ptr<KuduTable> table;
@@ -425,12 +289,12 @@ TEST_F(MasterHmsTest, TestDeleteTable) {
 
   // Ensure that dropping a table while the HMS is unreachable fails.
   ASSERT_OK(CreateKuduTable("default", "c"));
-  NO_FATALS(CheckTable("default", "c"));
+  NO_FATALS(CheckTable("default", "c", /*user=*/ none));
   ASSERT_OK(StopHms());
   Status s = client_->DeleteTable("default.c");
   ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
   ASSERT_OK(StartHms());
-  NO_FATALS(CheckTable("default", "c"));
+  NO_FATALS(CheckTable("default", "c", /*user=*/ none));
   ASSERT_EVENTUALLY([&] {
     // HmsCatalog throttles reconnections, so it's necessary to wait out the backoff.
     ASSERT_OK(client_->DeleteTable("default.c"));
@@ -440,7 +304,7 @@ TEST_F(MasterHmsTest, TestDeleteTable) {
   // Create a Kudu table, then only drop it from Kudu. Ensure the HMS
   // entry is not removed.
   ASSERT_OK(CreateKuduTable("default", "d"));
-  NO_FATALS(CheckTable("default", "d"));
+  NO_FATALS(CheckTable("default", "d", /*user=*/ none));
   hive::Table hms_table_d;
   ASSERT_OK(hms_client_->GetTable("default", "d", &hms_table_d));
   ASSERT_OK(client_->DeleteTableInCatalogs("default.d", false));
@@ -452,14 +316,14 @@ TEST_F(MasterHmsTest, TestDeleteTable) {
 TEST_F(MasterHmsTest, TestNotificationLogListener) {
   // Create a Kudu table.
   ASSERT_OK(CreateKuduTable("default", "a"));
-  NO_FATALS(CheckTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a", /*user=*/ none));
 
   // Rename the table in the HMS, and ensure that the notification log listener
   // detects the rename and updates the Kudu catalog accordingly.
   ASSERT_OK(RenameHmsTable("default", "a", "b"));
   ASSERT_EVENTUALLY([&] {
     NO_FATALS({
-      CheckTable("default", "b");
+      CheckTable("default", "b", /*user=*/ none);
       CheckTableDoesNotExist("default", "a");
     });
   });
@@ -477,10 +341,10 @@ TEST_F(MasterHmsTest, TestNotificationLogListener) {
   unique_ptr<KuduTableAlterer> table_alterer;
   table_alterer.reset(client_->NewTableAlterer("default.a")->RenameTo("default.b"));
   ASSERT_OK(table_alterer->Alter());
-  NO_FATALS(CheckTable("default", "b"));
+  NO_FATALS(CheckTable("default", "b", /*user=*/ none));
   table_alterer.reset(client_->NewTableAlterer("default.b")->RenameTo("default.a"));
   ASSERT_OK(table_alterer->Alter());
-  NO_FATALS(CheckTable("default", "a"));
+  NO_FATALS(CheckTable("default", "a", /*user=*/ none));
 
 
   // Ensure that Kudu can rename a table just after it's been renamed through the HMS.
@@ -511,9 +375,9 @@ TEST_F(MasterHmsTest, TestNotificationLogListener) {
 
 TEST_F(MasterHmsTest, TestUppercaseIdentifiers) {
   ASSERT_OK(CreateKuduTable("default", "MyTable"));
-  NO_FATALS(CheckTable("default", "MyTable"));
-  NO_FATALS(CheckTable("default", "mytable"));
-  NO_FATALS(CheckTable("default", "MYTABLE"));
+  NO_FATALS(CheckTable("default", "MyTable", /*user=*/none));
+  NO_FATALS(CheckTable("default", "mytable", /*user=*/none));
+  NO_FATALS(CheckTable("default", "MYTABLE", /*user=*/none));
 
   // Kudu table schema lookups should be case-insensitive.
   for (const auto& name : { "default.MyTable",
@@ -540,14 +404,14 @@ TEST_F(MasterHmsTest, TestUppercaseIdentifiers) {
   // Rename the table to something different.
   table_alterer.reset(client_->NewTableAlterer("DEFAULT.MYTABLE"));
   ASSERT_OK(table_alterer->RenameTo("default.T_1/1")->Alter());
-  NO_FATALS(CheckTable("default", "T_1/1"));
-  NO_FATALS(CheckTable("default", "t_1/1"));
-  NO_FATALS(CheckTable("DEFAULT", "T_1/1"));
+  NO_FATALS(CheckTable("default", "T_1/1", /*user=*/none));
+  NO_FATALS(CheckTable("default", "t_1/1", /*user=*/none));
+  NO_FATALS(CheckTable("DEFAULT", "T_1/1", /*user=*/none));
 
   // Rename the table through the HMS.
   ASSERT_OK(RenameHmsTable("default", "T_1/1", "AbC"));
   ASSERT_EVENTUALLY([&] {
-    NO_FATALS(CheckTable("default", "AbC"));
+    NO_FATALS(CheckTable("default", "AbC", /*user=*/none));
   });
 
   // Listing tables shows the normalized case.
diff --git a/src/kudu/integration-tests/master_sentry-itest.cc b/src/kudu/integration-tests/master_sentry-itest.cc
index ef2ad2e..939a260 100644
--- a/src/kudu/integration-tests/master_sentry-itest.cc
+++ b/src/kudu/integration-tests/master_sentry-itest.cc
@@ -15,96 +15,667 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <functional>
 #include <memory>
 #include <string>
+#include <unordered_set>
 #include <utility>
+#include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gtest/gtest.h>
 
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/common/common.pb.h"
+#include "kudu/common/table_util.h"
+#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
 #include "kudu/hms/mini_hms.h"
-#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/hms_itest-base.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/sentry_authz_provider-test-base.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/rpc/user_credentials.h"
 #include "kudu/security/test/mini_kdc.h"
+#include "kudu/sentry/mini_sentry.h"
+#include "kudu/sentry/sentry_client.h"
+#include "kudu/sentry/sentry_policy_service_types.h"
 #include "kudu/thrift/client.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 
-namespace kudu {
-
-using client::KuduColumnSchema;
-using client::KuduSchema;
-using client::KuduSchemaBuilder;
-using client::KuduTableCreator;
-using cluster::ExternalMiniClusterOptions;
-using hms::HmsClient;
+using ::sentry::TSentryGrantOption;
+using ::sentry::TSentryPrivilege;
+using boost::make_optional;
+using boost::none;
+using boost::optional;
+using kudu::client::KuduClient;
+using kudu::client::KuduScanToken;
+using kudu::client::KuduScanTokenBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableAlterer;
+using kudu::client::sp::shared_ptr;
+using kudu::hms::HmsClient;
+using kudu::master::GetTableLocationsResponsePB;
+using kudu::master::MasterServiceProxy;
+using kudu::master::TabletLocationsPB;
+using kudu::rpc::UserCredentials;
+using kudu::sentry::SentryClient;
+using std::function;
 using std::string;
 using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
 using strings::Substitute;
 
+namespace kudu {
+
 // Test Master authorization enforcement with Sentry and HMS
 // integration enabled.
-class MasterSentryTest : public ExternalMiniClusterITestBase,
-                         public ::testing::WithParamInterface<bool> {
+class SentryITestBase : public HmsITestBase {
  public:
+  static const char* const kAdminGroup;
+  static const char* const kAdminUser;
+  static const char* const kUserGroup;
+  static const char* const kTestUser;
+  static const char* const kImpalaUser;
+  static const char* const kDevRole;
+  static const char* const kAdminRole;
+  static const char* const kDatabaseName;
+  static const char* const kTableName;
+  static const char* const kSecondTable;
+
+  Status StopSentry() {
+    RETURN_NOT_OK(sentry_client_->Stop());
+    RETURN_NOT_OK(cluster_->sentry()->Stop());
+    return Status::OK();
+  }
+
+  Status StartSentry() {
+    RETURN_NOT_OK(cluster_->sentry()->Start());
+    RETURN_NOT_OK(cluster_->kdc()->Kinit("kudu"));
+    RETURN_NOT_OK(sentry_client_->Start());
+    return Status::OK();
+  }
+
+ Status GetTableLocationsWithTableId(const string& table_name,
+                                     optional<const string&> table_id) {
+    const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+    std::shared_ptr<MasterServiceProxy> proxy = cluster_->master_proxy();
+    UserCredentials user_credentials;
+    user_credentials.set_real_user(kTestUser);
+    proxy->set_user_credentials(user_credentials);
+
+    GetTableLocationsResponsePB table_locations;
+    return itest::GetTableLocations(proxy, table_name, kTimeout, master::VOTER_REPLICA,
+                                    table_id, &table_locations);
+ }
+
+  Status GrantCreateTablePrivilege(const string& database_name,
+                                   const string& /*table_name*/) {
+    TSentryPrivilege privilege = master::GetDatabasePrivilege(
+        database_name, "CREATE",
+        TSentryGrantOption::DISABLED);
+    return master::AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole, privilege);
+  }
+
+  Status GrantDropTablePrivilege(const string& database_name,
+                                 const string& table_name) {
+    TSentryPrivilege privilege = master::GetTablePrivilege(
+        database_name, table_name, "DROP",
+        TSentryGrantOption::DISABLED);
+    return master::AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole, privilege);
+  }
+
+  Status GrantAlterTablePrivilege(const string& database_name,
+                                  const string& table_name) {
+    TSentryPrivilege privilege = master::GetTablePrivilege(
+        database_name, table_name, "ALTER",
+        TSentryGrantOption::DISABLED);
+    return master::AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole, privilege);
+  }
+
+  Status GrantRenameTablePrivilege(const string& database_name,
+                                   const string& table_name) {
+    TSentryPrivilege privilege = master::GetTablePrivilege(
+        database_name, table_name, "ALL",
+        TSentryGrantOption::DISABLED);
+    RETURN_NOT_OK(master::AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole, privilege));
+    privilege = master::GetDatabasePrivilege(
+        database_name, "CREATE",
+        TSentryGrantOption::DISABLED);
+    return master::AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole, privilege);
+  }
+
+  Status GrantGetMetadataTablePrivilege(const string& database_name,
+                                        const string& table_name) {
+    TSentryPrivilege privilege = master::GetTablePrivilege(
+        database_name, table_name, "METADATA",
+        TSentryGrantOption::DISABLED);
+    return master::AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole, privilege);
+  }
+
+  Status CreateTable(const string& table,
+                     const string& /*new_table*/) {
+    Slice hms_database;
+    Slice hms_table;
+    RETURN_NOT_OK(ParseHiveTableIdentifier(table, &hms_database, &hms_table));
+    return CreateKuduTable(hms_database.ToString(), hms_table.ToString());
+  }
+
+  Status IsCreateTableDone(const string& table,
+                           const string& /*new_table*/) {
+    bool in_progress = false;
+    return client_->IsCreateTableInProgress(table, &in_progress);
+  }
+
+  Status DeleteTable(const string& table,
+                     const string& /*new_table*/) {
+    return client_->DeleteTable(table);
+  }
+
+  Status AlterTable(const string& table,
+                    const string& /*new_table*/) {
+    unique_ptr<KuduTableAlterer> table_alterer;
+    table_alterer.reset(client_->NewTableAlterer(table)
+                               ->DropColumn("int32_val"));
+    return table_alterer->Alter();
+  }
+
+  Status IsAlterTableDone(const string& table,
+                          const string& /*new_table*/) {
+    bool in_progress = false;
+    return client_->IsAlterTableInProgress(table, &in_progress);
+  }
+
+  Status RenameTable(const string& table,
+                     const string& new_table) {
+    unique_ptr<KuduTableAlterer> table_alterer;
+    table_alterer.reset(client_->NewTableAlterer(table)
+                               ->RenameTo(new_table));
+    return table_alterer->Alter();
+  }
+
+  Status GetTableSchema(const string& table,
+                        const string& /*new_table*/) {
+    KuduSchema schema;
+    return client_->GetTableSchema(table, &schema);
+  }
+
+  Status GetTableLocations(const string& table,
+                           const string& /*new_table*/) {
+    return GetTableLocationsWithTableId(table, /*table_id=*/none);
+  }
+
+  Status GetTabletLocations(const string& table,
+                            const string& /*new_table*/) {
+    // Log in as 'test-admin' to get the tablet ID.
+    RETURN_NOT_OK(cluster_->kdc()->Kinit(kAdminUser));
+    shared_ptr<KuduClient> client;
+    RETURN_NOT_OK(cluster_->CreateClient(nullptr, &client));
+
+    shared_ptr<KuduTable> kudu_table;
+    RETURN_NOT_OK(client->OpenTable(table, &kudu_table));
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(kudu_table.get());
+    RETURN_NOT_OK(builder.Build(&tokens));
+    const string tablet_id = tokens[0]->tablet().id();
+
+    // Log back as 'test-user'.
+    RETURN_NOT_OK(cluster_->kdc()->Kinit(kTestUser));
+
+    const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+    std::shared_ptr<MasterServiceProxy> proxy = cluster_->master_proxy();
+    UserCredentials user_credentials;
+    user_credentials.set_real_user(kTestUser);
+    proxy->set_user_credentials(user_credentials);
+
+    TabletLocationsPB tablet_locations;
+    return itest::GetTabletLocations(proxy, tablet_id, kTimeout,
+                                     master::VOTER_REPLICA, &tablet_locations);
+  }
 
   void SetUp() override {
-    ExternalMiniClusterITestBase::SetUp();
-    ExternalMiniClusterOptions opts;
+    HmsITestBase::SetUp();
+
+    cluster::ExternalMiniClusterOptions opts;
+    // Always enable Kerberos, as sentry deployment does not make much sense
+    // in a non-Kerberized environment.
+    bool enable_kerberos = true;
     opts.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION;
     opts.enable_sentry = true;
-    opts.enable_kerberos = enable_kerberos_;
+    opts.enable_kerberos = enable_kerberos;
+    // Configure the timeout to reduce the run time of tests that involve
+    // re-connections.
+    const string timeout = AllowSlowTests() ? "5" : "2";
+    opts.extra_master_flags.emplace_back(
+       Substitute("$0=$1", "--sentry_service_send_timeout_seconds", timeout));
+    opts.extra_master_flags.emplace_back(
+       Substitute("$0=$1", "--sentry_service_recv_timeout_seconds", timeout));
+
+    // Add 'impala' as trusted user who may access the cluster without being
+    // authorized.
+    opts.extra_master_flags.emplace_back("--trusted_user_acl=impala");
+    opts.extra_master_flags.emplace_back("--user_acl=test-user,impala");
     StartClusterWithOpts(std::move(opts));
-  }
 
- protected:
-    const bool enable_kerberos_ = GetParam();
-};
-INSTANTIATE_TEST_CASE_P(KerberosEnabled, MasterSentryTest, ::testing::Bool());
-
-// TODO(hao): Write a proper test when master authorization enforcement is done.
-TEST_P(MasterSentryTest, TestFoo) {
-  const string kDatabaseName = "default";
-  const string kTableName = "foo";
-  const string kTableIdentifier = Substitute("$0.$1", kDatabaseName, kTableName);
-
-  KuduSchema schema;
-  KuduSchemaBuilder b;
-  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
-  b.AddColumn("val")->Type(KuduColumnSchema::INT32);
-  ASSERT_OK(b.Build(&schema));
-  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
-  ASSERT_OK(table_creator->table_name(kTableIdentifier)
-                           .schema(&schema)
-                           .num_replicas(1)
-                           .set_range_partition_columns({ "key" })
-                           .Create());
-
-  // Verify the table exists in Kudu and the HMS.
-
-  bool exists;
-  ASSERT_OK(client_->TableExists(kTableIdentifier, &exists));
-  ASSERT_TRUE(exists);
-
-  thrift::ClientOptions hms_client_opts;
-  if (enable_kerberos_) {
-    // Create a principal 'kudu' and configure to use it.
+    // Create principals 'impala' and 'kudu'. Configure to use the latter.
+    ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(kImpalaUser));
     ASSERT_OK(cluster_->kdc()->CreateUserPrincipal("kudu"));
     ASSERT_OK(cluster_->kdc()->Kinit("kudu"));
-    ASSERT_OK(cluster_->kdc()->SetKrb5Environment());
-    hms_client_opts.enable_kerberos = true;
-    hms_client_opts.service_principal = "hive";
+
+    thrift::ClientOptions hms_opts;
+    hms_opts.enable_kerberos = enable_kerberos;
+    hms_opts.service_principal = "hive";
+    hms_client_.reset(new HmsClient(cluster_->hms()->address(), hms_opts));
+    ASSERT_OK(hms_client_->Start());
+
+    thrift::ClientOptions sentry_opts;
+    sentry_opts.enable_kerberos = enable_kerberos;
+    sentry_opts.service_principal = "sentry";
+    sentry_client_.reset(new SentryClient(cluster_->sentry()->address(), sentry_opts));
+    ASSERT_OK(sentry_client_->Start());
+
+    // User to Role mapping:
+    // 1. user -> developer,
+    // 2. admin -> admin.
+    ASSERT_OK(master::CreateRoleAndAddToGroups(sentry_client_.get(), kDevRole, kUserGroup));
+    ASSERT_OK(master::CreateRoleAndAddToGroups(sentry_client_.get(), kAdminRole, kAdminGroup));
+
+    // Grant privilege 'ALL' on database 'db' to role admin.
+    TSentryPrivilege privilege = master::GetDatabasePrivilege(
+        kDatabaseName, "ALL",
+        TSentryGrantOption::DISABLED);
+    ASSERT_OK(master::AlterRoleGrantPrivilege(sentry_client_.get(),
+                                              kAdminRole, privilege));
+
+    // Create database 'db' in HMS and Kudu tables 'table' and 'second_table' which
+    // owns by user 'test-admin'.
+    ASSERT_OK(CreateDatabase(kDatabaseName));
+    ASSERT_OK(CreateKuduTable(kDatabaseName, kTableName));
+    ASSERT_OK(CreateKuduTable(kDatabaseName, kSecondTable));
+    NO_FATALS(CheckTable(kDatabaseName, kTableName,
+                         make_optional<const string&>(kAdminUser)));
+    NO_FATALS(CheckTable(kDatabaseName, kSecondTable,
+                         make_optional<const string&>(kAdminUser)));
+
+    // Log in as 'test-user' and reset the client to pick up the change in user.
+    ASSERT_OK(cluster_->kdc()->Kinit(kTestUser));
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  }
+
+  void TearDown() override {
+    if (sentry_client_) {
+        ASSERT_OK(sentry_client_->Stop());
+    }
+    if (hms_client_) {
+        ASSERT_OK(hms_client_->Stop());
+    }
+    HmsITestBase::TearDown();
   }
-  hms::HmsClient hms_client(cluster_->hms()->address(), hms_client_opts);
-  ASSERT_OK(hms_client.Start());
 
-  hive::Table hms_table;
-  ASSERT_OK(hms_client.GetTable(kDatabaseName, kTableName, &hms_table));
+ protected:
+  unique_ptr<SentryClient> sentry_client_;
+};
+
+const char* const SentryITestBase::kAdminGroup = "admin";
+const char* const SentryITestBase::kAdminUser = "test-admin";
+const char* const SentryITestBase::kUserGroup = "user";
+const char* const SentryITestBase::kTestUser = "test-user";
+const char* const SentryITestBase::kImpalaUser = "impala";
+const char* const SentryITestBase::kDevRole = "developer";
+const char* const SentryITestBase::kAdminRole = "ad";
+const char* const SentryITestBase::kDatabaseName = "db";
+const char* const SentryITestBase::kTableName = "table";
+const char* const SentryITestBase::kSecondTable = "second_table";
+
+// Functor that performs a certain operation (e.g. Create, Rename) on a table given
+// its name and its desired new name (only used for Rename).
+typedef function<Status(SentryITestBase*, const string&, const string&)> OperatorFunc;
+
+// Functor that grants the required permission for an operation (e.g Create, Rename)
+// on a table given the database the table belongs to and the name of the table.
+typedef function<Status(SentryITestBase*, const string&, const string&)> PrivilegeFunc;
+
+// A description of the operation function that describes a particular action on a table
+// a user can perform, as well as the privilege granting function that grants the required
+// permission to the user to able to perform the action.
+struct AuthzFuncs {
+  OperatorFunc do_action;
+  PrivilegeFunc grant_privileges;
+};
+
+// A description of an authorization process, including the protected resource (table),
+// the operation function, as well as the privilege granting function.
+struct AuthzDescriptor {
+  AuthzFuncs funcs;
+  string database;
+  string table_name;
+  string new_table_name;
+};
+
+// Test basic master authorization enforcement with Sentry and HMS integration
+// enabled.
+class MasterSentryTest : public SentryITestBase {};
+
+// Test that the trusted user can access the cluster without being authorized.
+TEST_F(MasterSentryTest, TestTrustedUserAcl) {
+  // Log in as 'impala' and reset the client to pick up the change in user.
+  ASSERT_OK(cluster_->kdc()->Kinit(kImpalaUser));
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+
+  vector<string> tables;
+  ASSERT_OK(client_->ListTables(&tables));
+  unordered_set<string> tables_set(tables.begin(), tables.end());
+  ASSERT_EQ(unordered_set<string>({ Substitute("$0.$1", kDatabaseName, kTableName),
+                                    Substitute("$0.$1", kDatabaseName, kSecondTable) }),
+            tables_set);
+
+  ASSERT_OK(CreateKuduTable(kDatabaseName, "new_table"));
+  NO_FATALS(CheckTable(kDatabaseName, "new_table",
+                       make_optional<const string&>(kImpalaUser)));
+}
+
+TEST_F(MasterSentryTest, TestAuthzListTables) {
+  // ListTables is not parameterized as other operations below, because non-authorized
+  // tables will be filtered instead of returning NOT_AUTHORIZED error.
+  const auto table_name = Substitute("$0.$1", kDatabaseName, kTableName);
+  const auto sec_table_name = Substitute("$0.$1", kDatabaseName, kSecondTable);
+
+  // Listing tables shows nothing without proper privileges.
+  vector<string> tables;
+  ASSERT_OK(client_->ListTables(&tables));
+  ASSERT_TRUE(tables.empty());
+
+  // Listing tables only shows the tables which user has proper privileges on.
+  tables.clear();
+  ASSERT_OK(GrantGetMetadataTablePrivilege(kDatabaseName, kTableName));
+  ASSERT_OK(client_->ListTables(&tables));
+  ASSERT_EQ(vector<string>({ table_name }), tables);
+
+  tables.clear();
+  ASSERT_OK(GrantGetMetadataTablePrivilege(kDatabaseName, kSecondTable));
+  ASSERT_OK(client_->ListTables(&tables));
+  unordered_set<string> tables_set(tables.begin(), tables.end());
+  ASSERT_EQ(unordered_set<string>({ table_name, sec_table_name }), tables_set);
+}
+
+TEST_F(MasterSentryTest, TestTableOwnership) {
+  ASSERT_OK(GrantCreateTablePrivilege(kDatabaseName, "new_table"));
+  ASSERT_OK(CreateKuduTable(kDatabaseName, "new_table"));
+  NO_FATALS(CheckTable(kDatabaseName, "new_table",
+                       make_optional<const string&>(kTestUser)));
+
+  // Checks the user with table ownership automatically has ALL privilege on the
+  // table. User 'test-user' can delete the same table without specifically granting
+  // 'DROP ON TABLE'. Note that ownership population between the HMS and the Sentry
+  // service happens synchronously, therefore, the table deletion should succeed
+  // right after the table creation.
+  // TODO(hao): test create a table with a different owner than the client’s username?
+  ASSERT_OK(client_->DeleteTable(Substitute("$0.$1", kDatabaseName, "new_table")));
+  NO_FATALS(CheckTableDoesNotExist(kDatabaseName, "new_table"));
+}
+
+// TODO(hao): enable the following tests after SENTRY-2471 is fixed.
+TEST_F(MasterSentryTest, DISABLED_TestRenameTablePrivilegeTransfer) {
+  ASSERT_OK(GrantRenameTablePrivilege(kDatabaseName, kTableName));
+  ASSERT_OK(RenameTable(Substitute("$0.$1", kDatabaseName, kTableName),
+                        Substitute("$0.$1", kDatabaseName, "b")));
+  NO_FATALS(CheckTable(kDatabaseName, "b", make_optional<const string &>(kAdminUser)));
+
+
+  // Checks table rename in the HMS is synchronized with the Sentry privileges.
+  unique_ptr<KuduTableAlterer> table_alterer;
+  table_alterer.reset(client_->NewTableAlterer(Substitute("$0.$1", kDatabaseName, "b"))
+                             ->DropColumn("int16_val"));
+
+  // Note that unlike table creation, there could be a delay between the table renaming
+  // in Kudu and the privilege renaming in Sentry. Because Kudu uses the transactional
+  // listener of the HMS to get notification of table alteration events, while Sentry
+  // uses post event listener (which is executed outside the HMS transaction). There
+  // is a chance that Kudu already finish the table renaming but the privilege renaming
+  // hasn't been reflected in the Sentry service.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(table_alterer->Alter());
+  });
+  NO_FATALS(CheckTable(kDatabaseName, "b", make_optional<const string&>(kAdminUser)));
+
+  table_alterer.reset(client_->NewTableAlterer(Substitute("$0.$1", kDatabaseName, "b"))
+                             ->RenameTo(Substitute("$0.$1", kDatabaseName, "c")));
+  ASSERT_OK(table_alterer->Alter());
+  NO_FATALS(CheckTable(kDatabaseName, "c", make_optional<const string&>(kAdminUser)));
+}
+
+class TestAuthzTable : public MasterSentryTest,
+                       public ::testing::WithParamInterface<AuthzDescriptor> {};
+
+TEST_P(TestAuthzTable, TestAuthorizeTable) {
+  AuthzDescriptor desc = GetParam();
+  const auto table_name = Substitute("$0.$1", desc.database, desc.table_name);
+  const auto new_table_name = Substitute("$0.$1", desc.database, desc.new_table_name);
+
+  // User 'test-user' attempts to operate on the table without proper privileges.
+  Status s = desc.funcs.do_action(this, table_name, new_table_name);
+  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
+
+  // User 'test-user' can operate on the table after obtaining proper privileges.
+  ASSERT_OK(desc.funcs.grant_privileges(this, desc.database, desc.table_name));
+  ASSERT_OK(desc.funcs.do_action(this, table_name, new_table_name));
+
+  // Ensure that operating on a table while the Sentry is unreachable fails.
+  ASSERT_OK(StopSentry());
+  s = desc.funcs.do_action(this, table_name, new_table_name);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
 }
 
+INSTANTIATE_TEST_CASE_P(AuthzCombinations,
+                        TestAuthzTable,
+                        ::testing::Values(
+
+        AuthzDescriptor {
+          AuthzFuncs {
+            &SentryITestBase::CreateTable,
+            &SentryITestBase::GrantCreateTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          "new_table",
+          "",
+        },
+
+        AuthzDescriptor {
+          AuthzFuncs {
+            &SentryITestBase::DeleteTable,
+            &SentryITestBase::GrantDropTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          SentryITestBase::kTableName,
+          "",
+        },
+
+        AuthzDescriptor {
+          AuthzFuncs {
+            &SentryITestBase::AlterTable,
+            &SentryITestBase::GrantAlterTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          SentryITestBase::kTableName,
+          "",
+        },
+
+        AuthzDescriptor{
+          AuthzFuncs {
+            &SentryITestBase::RenameTable,
+            &SentryITestBase::GrantRenameTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          SentryITestBase::kTableName,
+          "new_table",
+        },
+
+        AuthzDescriptor {
+          AuthzFuncs {
+            &SentryITestBase::GetTableSchema,
+            &SentryITestBase::GrantGetMetadataTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          SentryITestBase::kTableName,
+          "",
+        },
+
+        AuthzDescriptor {
+          AuthzFuncs {
+            &SentryITestBase::GetTableLocations,
+            &SentryITestBase::GrantGetMetadataTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          SentryITestBase::kTableName,
+          "",
+        },
+
+        AuthzDescriptor {
+          AuthzFuncs {
+            &SentryITestBase::GetTabletLocations,
+            &SentryITestBase::GrantGetMetadataTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          SentryITestBase::kTableName,
+          ""
+        },
+
+        AuthzDescriptor {
+          AuthzFuncs {
+            &SentryITestBase::IsCreateTableDone,
+            &SentryITestBase::GrantGetMetadataTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          SentryITestBase::kTableName,
+          "",
+        },
+
+        AuthzDescriptor {
+          AuthzFuncs {
+            &SentryITestBase::IsAlterTableDone,
+            &SentryITestBase::GrantGetMetadataTablePrivilege,
+          },
+          SentryITestBase::kDatabaseName,
+          SentryITestBase::kTableName,
+          "",
+        }
+));
+
+// Test that when the client passes a table identifier with the table name
+// and table ID refer to different tables, the client needs permission on
+// both tables for returning TABLE_NOT_FOUND error to avoid leaking table
+// existence.
+TEST_F(MasterSentryTest, TestMismatchedTable) {
+  const auto table_name_a = Substitute("$0.$1", kDatabaseName, kTableName);
+  const auto table_name_b = Substitute("$0.$1", kDatabaseName, kSecondTable);
+
+  // Log in as 'test-admin' to get the tablet ID.
+  ASSERT_OK(cluster_->kdc()->Kinit(kAdminUser));
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(table_name_a, &table));
+  optional<const string&> table_id_a = make_optional<const string&>(table->id());
+
+  // Log back as 'test-user'.
+  ASSERT_OK(cluster_->kdc()->Kinit(kTestUser));
+
+  Status s = GetTableLocationsWithTableId(table_name_b, table_id_a);
+  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
+
+  ASSERT_OK(GrantGetMetadataTablePrivilege(kDatabaseName, kTableName));
+  s = GetTableLocationsWithTableId(table_name_b, table_id_a);
+  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
+
+  ASSERT_OK(GrantGetMetadataTablePrivilege(kDatabaseName, kSecondTable));
+  s = GetTableLocationsWithTableId(table_name_b, table_id_a);
+  ASSERT_TRUE(s.IsNotFound());
+  ASSERT_STR_CONTAINS(s.ToString(), "the table ID refers to a different table");
+}
+
+class AuthzErrorHandlingTest : public SentryITestBase,
+                               public ::testing::WithParamInterface<AuthzFuncs> {};
+TEST_P(AuthzErrorHandlingTest, TestNonExistentTable) {
+  AuthzFuncs funcs = GetParam();
+  const auto table_name = Substitute("$0.$1", kDatabaseName, "non_existent");
+  const auto new_table_name = Substitute("$0.$1", kDatabaseName, "b");
+
+  // Ensure that operating on a non-existent table without proper privileges gives
+  // a NOT_AUTHORIZED error, instead of leaking the table existence by giving a
+  // TABLE_NOT_FOUND error.
+  Status s = funcs.do_action(this, table_name, new_table_name);
+  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
+
+  // Ensure that operating on a non-existent table while the Sentry is unreachable fails.
+  ASSERT_OK(StopSentry());
+  s = funcs.do_action(this, table_name, new_table_name);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // Ensure that operating on a non-existent table with proper privileges gives a
+  // TABLE_NOT_FOUND error.
+  ASSERT_OK(StartSentry());
+  ASSERT_EVENTUALLY([&] {
+    // SentryAuthzProvider throttles reconnections, so it's necessary
+    // to wait out the backoff.
+    ASSERT_OK(funcs.grant_privileges(this, kDatabaseName, "non_existent"));
+  });
+  s = funcs.do_action(this, table_name, new_table_name);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+}
+
+INSTANTIATE_TEST_CASE_P(AuthzFuncCombinations,
+                        AuthzErrorHandlingTest,
+                        ::testing::Values(
+
+        AuthzFuncs {
+          &SentryITestBase::DeleteTable,
+          &SentryITestBase::GrantDropTablePrivilege,
+        },
+
+        AuthzFuncs {
+          &SentryITestBase::AlterTable,
+          &SentryITestBase::GrantAlterTablePrivilege,
+        },
+
+        AuthzFuncs {
+          &SentryITestBase::RenameTable,
+          &SentryITestBase::GrantRenameTablePrivilege,
+        },
+
+        AuthzFuncs {
+          &SentryITestBase::GetTableSchema,
+          &SentryITestBase::GrantGetMetadataTablePrivilege,
+        },
+
+        AuthzFuncs {
+          &SentryITestBase::GetTableLocations,
+          &SentryITestBase::GrantGetMetadataTablePrivilege,
+        },
+
+        AuthzFuncs {
+          &SentryITestBase::IsCreateTableDone,
+          &SentryITestBase::GrantGetMetadataTablePrivilege,
+        },
+
+        AuthzFuncs {
+          &SentryITestBase::IsAlterTableDone,
+          &SentryITestBase::GrantGetMetadataTablePrivilege,
+        }
+));
 } // namespace kudu
diff --git a/src/kudu/integration-tests/raft_config_change-itest.cc b/src/kudu/integration-tests/raft_config_change-itest.cc
index c17f5c6..94af01d 100644
--- a/src/kudu/integration-tests/raft_config_change-itest.cc
+++ b/src/kudu/integration-tests/raft_config_change-itest.cc
@@ -47,6 +47,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using boost::none;
 using kudu::consensus::ADD_PEER;
 using kudu::consensus::COMMITTED_OPID;
 using kudu::consensus::ConsensusStatePB;
@@ -124,7 +125,7 @@ TEST_F(RaftConfigChangeITest, TestKudu2147) {
   ASSERT_OK(inspect_->WaitForReplicaCount(3));
   master::GetTableLocationsResponsePB table_locations;
   ASSERT_OK(GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
-                              kTimeout, VOTER_REPLICA, &table_locations));
+                              kTimeout, VOTER_REPLICA, /*table_id=*/none, &table_locations));
   ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
   ASSERT_EQ(3, table_locations.tablet_locations().begin()->replicas_size()); // 3 replicas.
   string tablet_id = table_locations.tablet_locations().begin()->tablet_id();
@@ -189,7 +190,7 @@ TEST_F(RaftConfigChangeITest, TestNonVoterPromotion) {
   ASSERT_OK(inspect_->WaitForReplicaCount(3));
   master::GetTableLocationsResponsePB table_locations;
   ASSERT_OK(GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
-                              kTimeout, VOTER_REPLICA, &table_locations));
+                              kTimeout, VOTER_REPLICA, /*table_id=*/none, &table_locations));
   ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
   ASSERT_EQ(3, table_locations.tablet_locations().begin()->replicas_size()); // 3 replicas.
   string tablet_id = table_locations.tablet_locations().begin()->tablet_id();
@@ -247,7 +248,7 @@ TEST_F(RaftConfigChangeITest, TestBulkChangeConfig) {
   ASSERT_OK(inspect_->WaitForReplicaCount(kNumInitialReplicas));
   master::GetTableLocationsResponsePB table_locations;
   ASSERT_OK(GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
-                              kTimeout, VOTER_REPLICA, &table_locations));
+                              kTimeout, VOTER_REPLICA, /*table_id=*/none, &table_locations));
   ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
   ASSERT_EQ(kNumInitialReplicas, table_locations.tablet_locations().begin()->replicas_size());
   string tablet_id = table_locations.tablet_locations().begin()->tablet_id();
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index 2a11642..48081d2 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -25,6 +25,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -66,6 +67,7 @@ DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 METRIC_DECLARE_gauge_int32(tablet_copy_open_client_sessions);
 METRIC_DECLARE_gauge_int32(tablet_copy_open_source_sessions);
 
+using boost::none;
 using kudu::client::sp::shared_ptr;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
@@ -275,7 +277,8 @@ TEST_F(RaftConsensusNonVoterITest, GetTableAndTabletLocations) {
   {
     GetTableLocationsResponsePB table_locations;
     ASSERT_OK(GetTableLocations(cluster_->master_proxy(), table_->name(),
-                                kTimeout, VOTER_REPLICA, &table_locations));
+                                kTimeout, VOTER_REPLICA, /*table_id=*/none,
+                                &table_locations));
     ASSERT_EQ(1, table_locations.tablet_locations().size());
     const TabletLocationsPB& locations = table_locations.tablet_locations(0);
     ASSERT_EQ(tablet_id_, locations.tablet_id());
@@ -288,7 +291,8 @@ TEST_F(RaftConsensusNonVoterITest, GetTableAndTabletLocations) {
   {
     GetTableLocationsResponsePB table_locations;
     ASSERT_OK(GetTableLocations(cluster_->master_proxy(), table_->name(),
-                                kTimeout, ANY_REPLICA, &table_locations));
+                                kTimeout, ANY_REPLICA, /*table_id=*/none,
+                                &table_locations));
     ASSERT_EQ(1, table_locations.tablet_locations().size());
     const TabletLocationsPB& locations = table_locations.tablet_locations(0);
     ASSERT_EQ(tablet_id_, locations.tablet_id());
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index 6414127..74fcf36 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -23,6 +23,7 @@
 #include <string>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -67,6 +68,7 @@ DECLARE_int32(heartbeat_interval_ms);
 METRIC_DECLARE_counter(rows_inserted);
 METRIC_DECLARE_counter(rows_updated);
 
+using boost::none;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -114,7 +116,7 @@ void CreateTableForTesting(MiniMaster* mini_master,
     {
       CatalogManager::ScopedLeaderSharedLock l(catalog);
       ASSERT_OK(l.first_failed_status());
-      ASSERT_OK(catalog->IsCreateTableDone(&req, &resp));
+      ASSERT_OK(catalog->IsCreateTableDone(&req, &resp, /*user=*/none));
     }
     if (resp.done()) {
       is_table_created = true;
@@ -198,7 +200,7 @@ class RegistrationTest : public KuduTest {
           break;  // exiting out of the 'do {...} while (false)' scope
         }
         RETURN_NOT_OK(ls);
-        s = catalog->GetTabletLocations(tablet_id, master::VOTER_REPLICA, &loc);
+        s = catalog->GetTabletLocations(tablet_id, master::VOTER_REPLICA, &loc, /*user=*/none);
       } while (false);
       if (s.ok() && loc.replicas_size() == expected_count) {
         if (locations) {
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index b3a8a94..2a5fbbf 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -28,6 +28,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -91,6 +92,7 @@ DEFINE_int32(test_delete_leader_payload_bytes, 16 * 1024,
 DEFINE_int32(test_delete_leader_num_writer_threads, 1,
              "Number of writer threads in TestDeleteLeaderDuringTabletCopyStressTest.");
 
+using boost::none;
 using kudu::client::KuduSchema;
 using kudu::client::KuduTableCreator;
 using kudu::cluster::ExternalMiniClusterOptions;
@@ -1249,7 +1251,8 @@ TEST_P(TabletCopyFailureITest, TestTabletCopyNewReplicaFailureCanVote) {
   ASSERT_OK(inspect_->WaitForReplicaCount(kNumReplicas));
   master::GetTableLocationsResponsePB table_locations;
   ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
-                                     kTimeout, master::VOTER_REPLICA, &table_locations));
+                                     kTimeout, master::VOTER_REPLICA, /*table_id=*/none,
+                                     &table_locations));
   ASSERT_EQ(1, table_locations.tablet_locations_size());
   string tablet_id = table_locations.tablet_locations(0).tablet_id();
   set<string> replica_uuids;
diff --git a/src/kudu/integration-tests/tombstoned_voting-itest.cc b/src/kudu/integration-tests/tombstoned_voting-itest.cc
index 9c28cd9..484b575 100644
--- a/src/kudu/integration-tests/tombstoned_voting-itest.cc
+++ b/src/kudu/integration-tests/tombstoned_voting-itest.cc
@@ -21,6 +21,7 @@
 #include <string>
 #include <unordered_map>
 
+#include <boost/optional/optional.hpp>
 #include <gtest/gtest.h>
 
 #include "kudu/consensus/consensus-test-util.h"
@@ -42,6 +43,7 @@
 
 METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_TSHeartbeat);
 
+using boost::none;
 using kudu::cluster::ExternalMaster;
 using kudu::cluster::ExternalTabletServer;
 using kudu::consensus::MakeOpId;
@@ -90,7 +92,8 @@ TEST_F(TombstonedVotingITest, TestTombstonedReplicaWithoutCMetaCanVote) {
   ASSERT_OK(inspect_->WaitForReplicaCount(3));
   master::GetTableLocationsResponsePB table_locations;
   ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
-                                     kTimeout, master::VOTER_REPLICA, &table_locations));
+                                     kTimeout, master::VOTER_REPLICA, /*table_id=*/none,
+                                     &table_locations));
   ASSERT_EQ(1, table_locations.tablet_locations_size());
   string tablet_id = table_locations.tablet_locations(0).tablet_id();
   set<string> replica_uuids;
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 79f38ce..ba68439 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -33,6 +33,7 @@ ADD_EXPORTABLE_LIBRARY(master_proto
   NONLINK_DEPS ${MASTER_KRPC_TGTS})
 
 set(MASTER_SRCS
+  authz_provider.cc
   catalog_manager.cc
   hms_notification_log_listener.cc
   location_cache.cc
diff --git a/src/kudu/master/authz_provider.cc b/src/kudu/master/authz_provider.cc
new file mode 100644
index 0000000..89cebc8
--- /dev/null
+++ b/src/kudu/master/authz_provider.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/authz_provider.h"
+
+#include <iterator>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/util/flag_tags.h"
+
+DEFINE_string(trusted_user_acl, "",
+    "Comma-separated list of trusted users who may access the cluster "
+    "without being authorized against fine-grained permissions.");
+TAG_FLAG(trusted_user_acl, experimental);
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace master {
+
+AuthzProvider::AuthzProvider() {
+  vector<string> acls = strings::Split(FLAGS_trusted_user_acl, ",", strings::SkipEmpty());
+  std::move(acls.begin(), acls.end(), std::inserter(trusted_users_, trusted_users_.end()));
+}
+
+bool AuthzProvider::IsTrustedUser(const string& user) {
+  return ContainsKey(trusted_users_, user);
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/authz_provider.h b/src/kudu/master/authz_provider.h
index 417dd55..d45f03f 100644
--- a/src/kudu/master/authz_provider.h
+++ b/src/kudu/master/authz_provider.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <string>
+#include <unordered_set>
 
 #include "kudu/gutil/port.h"
 #include "kudu/util/status.h"
@@ -29,6 +30,8 @@ namespace master {
 class AuthzProvider {
  public:
 
+  AuthzProvider();
+
   // Starts the AuthzProvider instance.
   virtual Status Start() = 0;
 
@@ -69,6 +72,13 @@ class AuthzProvider {
                                            const std::string& user) WARN_UNUSED_RESULT = 0;
 
   virtual ~AuthzProvider() {}
+
+  // Checks if the given user is trusted and thus can be exempted from
+  // authorization validation.
+  bool IsTrustedUser(const std::string& user);
+
+ private:
+  std::unordered_set<std::string> trusted_users_;
 };
 
 } // namespace master
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 46e16e4..660c3d5 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -260,6 +260,8 @@ DECLARE_int64(tsk_rotation_seconds);
 
 using base::subtle::NoBarrier_CompareAndSwap;
 using base::subtle::NoBarrier_Load;
+using boost::make_optional;
+using boost::none;
 using boost::optional;
 using kudu::cfile::TypeEncodingInfo;
 using kudu::consensus::ConsensusServiceProxy;
@@ -753,6 +755,8 @@ Status CatalogManager::Init(bool is_first_run) {
     }
   }
 
+  RETURN_NOT_OK_PREPEND(authz_provider_->Start(), "failed to start Authz Provider");
+
   background_tasks_.reset(new CatalogManagerBgTasks(this));
   RETURN_NOT_OK_PREPEND(background_tasks_->Init(),
                         "Failed to initialize catalog manager background tasks");
@@ -1231,6 +1235,10 @@ void CatalogManager::Shutdown() {
     background_tasks_->Shutdown();
   }
 
+  if (authz_provider_) {
+    authz_provider_->Stop();
+  }
+
   if (hms_catalog_) {
     hms_notification_log_listener_->Shutdown();
     hms_catalog_->Stop();
@@ -1322,7 +1330,7 @@ Status ValidateIdentifier(const string& id) {
 // Validate the client-provided schema and name.
 Status ValidateClientSchema(const optional<string>& name,
                             const Schema& schema) {
-  if (name != boost::none) {
+  if (name) {
     RETURN_NOT_OK_PREPEND(ValidateIdentifier(name.get()), "invalid table name");
   }
   for (int i = 0; i < schema.num_columns(); i++) {
@@ -1397,9 +1405,17 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   }
 
   // a. Validate the user request.
+  const string& normalized_table_name = NormalizeTableName(req.name());
+  if (rpc) {
+    const string& user = rpc->remote_user().username();
+    const string& owner = req.has_owner() ? req.owner() : user;
+    RETURN_NOT_OK(SetupError(
+        authz_provider_->AuthorizeCreateTable(normalized_table_name, user, owner),
+        resp, MasterErrorPB::NOT_AUTHORIZED));
+  }
+
   Schema client_schema;
   RETURN_NOT_OK(SchemaFromPB(req.schema(), &client_schema));
-  const string normalized_table_name = NormalizeTableName(req.name());
 
   RETURN_NOT_OK(SetupError(ValidateClientSchema(normalized_table_name, client_schema),
                            resp, MasterErrorPB::INVALID_SCHEMA));
@@ -1607,7 +1623,8 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   // It is critical that this step happen before writing the table to the sys catalog,
   // since this step validates that the table name is available in the HMS catalog.
   if (hms_catalog_) {
-    string owner = req.has_owner() ? req.owner() : rpc->remote_user().username();
+    CHECK(rpc);
+    const string& owner = req.has_owner() ? req.owner() : rpc->remote_user().username();
     Status s = hms_catalog_->CreateTable(table->id(), normalized_table_name, owner, schema);
     if (!s.ok()) {
       s = s.CloneAndPrepend(Substitute("an error occurred while creating table $0 in the HMS",
@@ -1680,14 +1697,21 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
 }
 
 Status CatalogManager::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
-                                         IsCreateTableDoneResponsePB* resp) {
+                                         IsCreateTableDoneResponsePB* resp,
+                                         optional<const string&> user) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  // 1. Lookup the table and verify if it exists
+  // 1. Lookup the table, verify if it exists, and then check that
+  //    the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::READ, &table, &l));
+  auto authz_func = [&] (const string& username, const string& table_name) {
+    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, username),
+                      resp, MasterErrorPB::NOT_AUTHORIZED);
+  };
+  RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, authz_func, user,
+                                          &table, &l));
   RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
   // 2. Verify if the create is in-progress
@@ -1727,22 +1751,49 @@ scoped_refptr<TabletInfo> CatalogManager::CreateTabletInfo(const scoped_refptr<T
   return tablet;
 }
 
-template<typename ReqClass, typename RespClass>
-Status CatalogManager::FindAndLockTable(const ReqClass& request,
-                                        RespClass* response,
-                                        LockMode lock_mode,
-                                        scoped_refptr<TableInfo>* table_info,
-                                        TableMetadataLock* table_lock) {
-  TRACE("Looking up and locking table");
+template<typename ReqClass, typename RespClass, typename F>
+Status CatalogManager::FindLockAndAuthorizeTable(
+    const ReqClass& request,
+    RespClass* response,
+    LockMode lock_mode,
+    F authz_func,
+    optional<const string&> user,
+    scoped_refptr<TableInfo>* table_info,
+    TableMetadataLock* table_lock) {
+  TRACE("Looking up, locking, and authorizing table");
   const TableIdentifierPB& table_identifier = request.table();
 
+  // For authorization, depends on whether the request contains table ID/name,
+  // below is the name of the table to validate against.
+  // *----------------*--------------*---------------------*-----------------*
+  // | HAS TABLE NAME | HAS TABLE ID | TABLE NAME/ID MATCH | AUTHZ NAME      |
+  // *----------------*--------------*---------------------*-----------------*
+  // | YES            | YES          | YES                 | TABLE NAME      |
+  // *----------------*--------------*---------------------*-----------------*
+  // | YES            | YES          | NO                  | TABLE NAME/ID   |
+  // *----------------*--------------*---------------------*-----------------*
+  // | YES            | NO           | N/A                 | TABLE NAME      |
+  // *----------------*--------------*---------------------*-----------------*
+  // | NO             | YES          | N/A                 | TABLE ID        |
+  // *----------------*--------------*---------------------*-----------------*
+  // | NO             | NO           | N/A                 | InvalidArgument |
+  // *----------------*--------------*---------------------*-----------------*
+  auto authorize = [&] (const string& name) {
+    if (user) {
+      return authz_func(*user, name);
+    }
+    return Status::OK();
+  };
+
   auto tnf_error = [&] {
-    return SetupError(Status::NotFound(
-          "the table does not exist", SecureShortDebugString(table_identifier)),
+    return SetupError(
+        Status::NotFound("the table does not exist", SecureShortDebugString(table_identifier)),
         response, MasterErrorPB::TABLE_NOT_FOUND);
   };
 
   scoped_refptr<TableInfo> table;
+  // Set to true if the client-provided table name and ID refer to different tables.
+  bool mismatched_table = false;
   {
     shared_lock<LockType> l(lock_);
     if (table_identifier.has_table_id()) {
@@ -1753,7 +1804,7 @@ Status CatalogManager::FindAndLockTable(const ReqClass& request,
       if (table_identifier.has_table_name() &&
           table.get() != FindPtrOrNull(normalized_table_names_map_,
                                        NormalizeTableName(table_identifier.table_name())).get()) {
-        return tnf_error();
+        mismatched_table = true;
       }
     } else if (table_identifier.has_table_name()) {
       table = FindPtrOrNull(normalized_table_names_map_,
@@ -1765,15 +1816,41 @@ Status CatalogManager::FindAndLockTable(const ReqClass& request,
   }
 
   // If the table doesn't exist, don't attempt to lock it.
+  //
+  // If the request contains table name and the user is authorized to operate
+  // on the table, then return TABLE_NOT_FOUND error. Otherwise, return
+  // NOT_AUTHORIZED error, to avoid leaking table existence.
   if (!table) {
+    if (table_identifier.has_table_name()) {
+      RETURN_NOT_OK(authorize(NormalizeTableName(table_identifier.table_name())));
+    }
     return tnf_error();
   }
 
-  // Acquire the table lock.
+  // Acquire the table lock. And validate if the operation on the table
+  // found is authorized.
   TableMetadataLock lock(table.get(), lock_mode);
+  string table_name = NormalizeTableName(lock.data().name());
+  RETURN_NOT_OK(authorize(table_name));
+
+  // If the table name and table ID refer to different tables, for example,
+  //   1. the ID maps to table A.
+  //   2. the name maps to table B.
+  //
+  // Authorize user against both tables, then return TABLE_NOT_FOUND error to
+  // avoid leaking table existence.
+  if (mismatched_table) {
+    RETURN_NOT_OK(authorize(NormalizeTableName(table_identifier.table_name())));
+    return SetupError(
+        Status::NotFound(
+            Substitute("the table ID refers to a different table '$0' than '$1'",
+                       table_name, table_identifier.table_name()),
+            SecureShortDebugString(table_identifier)),
+        response, MasterErrorPB::TABLE_NOT_FOUND);
+  }
 
   if (table_identifier.has_table_name() &&
-      NormalizeTableName(table_identifier.table_name()) != NormalizeTableName(lock.data().name())) {
+      NormalizeTableName(table_identifier.table_name()) != table_name) {
     // We've encountered the table while it's in the process of being renamed;
     // pretend it doesn't yet exist.
     return tnf_error();
@@ -1793,6 +1870,9 @@ Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req,
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
+  optional<const string&> user = rpc ?
+      make_optional<const string&>(rpc->remote_user().username()) : none;
+
   // If the HMS integration is enabled and the table should be deleted in the HMS,
   // then don't directly remove the table from the Kudu catalog. Instead, delete
   // the table from the HMS and wait for the notification log listener to apply
@@ -1804,10 +1884,16 @@ Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req,
     // renamed in the HMS.
     RETURN_NOT_OK(WaitForNotificationLogListenerCatchUp(resp, rpc));
 
-    // Look up and lock the table.
+    // Look up the table, lock it and then check that the user is authorized
+    // to operate on the table.
     scoped_refptr<TableInfo> table;
     TableMetadataLock l;
-    RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::READ, &table, &l));
+    auto authz_func = [&](const string& username, const string& table_name) {
+      return SetupError(authz_provider_->AuthorizeDropTable(table_name, username),
+                        resp, MasterErrorPB::NOT_AUTHORIZED);
+    };
+    RETURN_NOT_OK(FindLockAndAuthorizeTable(req, resp, LockMode::READ, authz_func, user,
+                                            &table, &l));
     if (l.data().is_deleted()) {
       return SetupError(Status::NotFound("the table was deleted", l.data().pb.state_msg()),
           resp, MasterErrorPB::TABLE_NOT_FOUND);
@@ -1826,7 +1912,7 @@ Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req,
 
   // If the HMS integration isn't enabled or the deletion should only happen in Kudu,
   // then delete the table directly from the Kudu catalog.
-  return DeleteTable(req, resp, boost::none);
+  return DeleteTable(req, resp, /*hms_notification_log_event_id=*/none, user);
 }
 
 Status CatalogManager::DeleteTableHms(const string& table_name,
@@ -1842,7 +1928,10 @@ Status CatalogManager::DeleteTableHms(const string& table_name,
   req.mutable_table()->set_table_name(table_name);
   req.mutable_table()->set_table_id(table_id);
 
-  RETURN_NOT_OK(DeleteTable(req, &resp, notification_log_event_id));
+  // Use empty user to skip the authorization validation since the operation
+  // originates from catalog manager. Moreover, this avoids duplicate effort,
+  // because we already perform authorization before making any changes to the HMS.
+  RETURN_NOT_OK(DeleteTable(req, &resp, notification_log_event_id, /*user=*/none));
 
   // Update the cached HMS notification log event ID, if it changed.
   DCHECK_GT(notification_log_event_id, hms_notification_log_event_id_);
@@ -1853,14 +1942,21 @@ Status CatalogManager::DeleteTableHms(const string& table_name,
 
 Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req,
                                    DeleteTableResponsePB* resp,
-                                   optional<int64_t> hms_notification_log_event_id) {
+                                   optional<int64_t> hms_notification_log_event_id,
+                                   optional<const string&> user) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  // 1. Look up the table, lock it, and mark it as removed.
+  // 1. Look up the table, lock it, and then check that the user is authorized
+  //    to operate on the table. Last, mark it as removed.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::WRITE, &table, &l));
+  auto authz_func = [&] (const string& username, const string& table_name) {
+    return SetupError(authz_provider_->AuthorizeDropTable(table_name, username),
+                      resp, MasterErrorPB::NOT_AUTHORIZED);
+  };
+  RETURN_NOT_OK(FindLockAndAuthorizeTable(req, resp, LockMode::WRITE, authz_func, user,
+                                          &table, &l));
   if (l.data().is_deleted()) {
     return SetupError(Status::NotFound("the table was deleted", l.data().pb.state_msg()),
         resp, MasterErrorPB::TABLE_NOT_FOUND);
@@ -2195,6 +2291,9 @@ Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req,
   LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
                           RequestorString(rpc), SecureShortDebugString(req));
 
+  optional<const string&> user = rpc ?
+      make_optional<const string&>(rpc->remote_user().username()) : none;
+
   // If the HMS integration is enabled, the alteration includes a table
   // rename and the table should be altered in the HMS, then don't directly
   // rename the table in the Kudu catalog. Instead, rename the table
@@ -2202,12 +2301,21 @@ Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req,
   // that event to the catalog. By 'serializing' the rename through the
   // HMS, race conditions are avoided.
   if (hms_catalog_ && req.has_new_table_name() && req.modify_external_catalogs()) {
-    // Look up the table and lock it.
+    // Look up the table, lock it and then check that the user is authorized
+    // to operate on the table.
     scoped_refptr<TableInfo> table;
     TableMetadataLock l;
-    RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::READ, &table, &l));
-    RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
     string normalized_new_table_name = NormalizeTableName(req.new_table_name());
+    auto authz_func = [&](const string& username,
+                          const string& table_name) {
+      return SetupError(authz_provider_->AuthorizeAlterTable(table_name,
+                                                             normalized_new_table_name,
+                                                             username),
+                        resp, MasterErrorPB::NOT_AUTHORIZED);
+    };
+    RETURN_NOT_OK(FindLockAndAuthorizeTable(req, resp, LockMode::READ, authz_func, user,
+                                            &table, &l));
+    RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
     // The HMS allows renaming a table to the same name (ALTER TABLE t RENAME TO t),
     // however Kudu does not, so we must enforce this constraint ourselves before
@@ -2237,15 +2345,24 @@ Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req,
     // Finally, apply the remaining schema and partitioning alterations to the
     // local catalog. Since Kudu holds the canonical version of table schemas
     // and partitions the HMS is not updated first.
+    //
+    // Note that we pass empty user to AlterTable() to skip the authorization
+    // validation since we already perform authorization before making any
+    // changes to the HMS. Moreover, even though a table renaming could happen
+    // before the remaining schema and partitioning alterations taking place,
+    // it is ideal from the users' point of view, to not authorize against the
+    // new table name arose from other RPCs.
     AlterTableRequestPB r(req);
     r.mutable_table()->clear_table_name();
     r.mutable_table()->set_table_id(table->id());
     r.clear_new_table_name();
 
-    return AlterTable(r, resp, boost::none);
+    return AlterTable(r, resp,
+                      /*hms_notification_log_event_id=*/none,
+                      /*user=*/none);
   }
 
-  return AlterTable(req, resp, boost::none);
+  return AlterTable(req, resp, /*hms_notification_log_event_id=*/ none, user);
 }
 
 Status CatalogManager::RenameTableHms(const string& table_id,
@@ -2258,7 +2375,10 @@ Status CatalogManager::RenameTableHms(const string& table_id,
   req.mutable_table()->set_table_name(table_name);
   req.set_new_table_name(new_table_name);
 
-  RETURN_NOT_OK(AlterTable(req, &resp, notification_log_event_id));
+  // Use empty user to skip the authorization validation since the operation
+  // originates from catalog manager. Moreover, this avoids duplicate effort,
+  // because we already perform authorization before making any changes to the HMS.
+  RETURN_NOT_OK(AlterTable(req, &resp, notification_log_event_id, /*user=*/none));
 
   // Update the cached HMS notification log event ID.
   DCHECK_GT(notification_log_event_id, hms_notification_log_event_id_);
@@ -2269,7 +2389,8 @@ Status CatalogManager::RenameTableHms(const string& table_id,
 
 Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
                                   AlterTableResponsePB* resp,
-                                  optional<int64_t> hms_notification_log_event_id) {
+                                  optional<int64_t> hms_notification_log_event_id,
+                                  optional<const string&> user) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
@@ -2296,10 +2417,19 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
     }
   }
 
-  // 2. Lookup the table, verify if it exists, and lock it for modification.
+  // 2. Lookup the table, verify if it exists, lock it for modification, and then
+  //    checks that the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::WRITE, &table, &l));
+  auto authz_func = [&] (const string& username,
+                         const string& table_name) {
+    const string new_table = req.has_new_table_name() ?
+        NormalizeTableName(req.new_table_name()) : table_name;
+    return SetupError(authz_provider_->AuthorizeAlterTable(table_name, new_table, username),
+                      resp, MasterErrorPB::NOT_AUTHORIZED);
+  };
+  RETURN_NOT_OK(FindLockAndAuthorizeTable(req, resp, LockMode::WRITE, authz_func, user,
+                                          &table, &l));
   if (l.data().is_deleted()) {
     return SetupError(
         Status::NotFound("the table was deleted", l.data().pb.state_msg()),
@@ -2327,7 +2457,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
 
   // Just validate the schema, not the name (validated below).
   RETURN_NOT_OK(SetupError(
-        ValidateClientSchema(boost::none, new_schema),
+        ValidateClientSchema(none, new_schema),
         resp, MasterErrorPB::INVALID_SCHEMA));
 
   // 4. Validate and try to acquire the new table name.
@@ -2547,14 +2677,20 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
 
 Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
                                         IsAlterTableDoneResponsePB* resp,
-                                        rpc::RpcContext* rpc) {
+                                        optional<const string&> user) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  // 1. Lookup the table and verify if it exists
+  // 1. Lookup the table, verify if it exists, and then check that
+  //    the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::READ, &table, &l));
+  auto authz_func = [&] (const string& username, const string& table_name) {
+    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, username),
+                      resp, MasterErrorPB::NOT_AUTHORIZED);
+  };
+  RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, authz_func, user,
+                                          &table, &l));
   RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
   // 2. Verify if the alter is in-progress
@@ -2566,14 +2702,22 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
 }
 
 Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req,
-                                      GetTableSchemaResponsePB* resp) {
+                                      GetTableSchemaResponsePB* resp,
+                                      optional<const string&> user) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  // Lookup the table and verify if it exists
+  // Lookup the table, verify if it exists, and then check that
+  // the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::READ, &table, &l));
+
+  auto authz_func = [&] (const string& username, const string& table_name) {
+    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, username),
+                      resp, MasterErrorPB::NOT_AUTHORIZED);
+  };
+  RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, authz_func, user,
+                                          &table, &l));
   RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
   if (l.data().pb.has_fully_applied_schema()) {
@@ -2594,25 +2738,43 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req,
 }
 
 Status CatalogManager::ListTables(const ListTablesRequestPB* req,
-                                  ListTablesResponsePB* resp) {
+                                  ListTablesResponsePB* resp,
+                                  optional<const string&> user) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  shared_lock<LockType> l(lock_);
+  vector<scoped_refptr<TableInfo>> tables_info;
+  {
+    shared_lock<LockType> l(lock_);
+    for (const TableInfoMap::value_type &entry : normalized_table_names_map_) {
+      tables_info.emplace_back(entry.second);
+    }
+  }
 
-  for (const TableInfoMap::value_type& entry : normalized_table_names_map_) {
-    TableMetadataLock ltm(entry.second.get(), LockMode::READ);
+  for (const auto& table_info : tables_info) {
+    TableMetadataLock ltm(table_info.get(), LockMode::READ);
     if (!ltm.data().is_running()) continue; // implies !is_deleted() too
 
+    const string table_name = ltm.data().name();
     if (req->has_name_filter()) {
-      size_t found = ltm.data().name().find(req->name_filter());
+      size_t found = table_name.find(req->name_filter());
       if (found == string::npos) {
         continue;
       }
     }
 
+    // TODO(hao): need to improve the performance before enabling authz by default
+    // for cases there are a lot of tables.
+    if (user) {
+      Status s = authz_provider_->AuthorizeGetTableMetadata(NormalizeTableName(table_name),
+                                                            *user);
+      if (!s.ok()) {
+        continue;
+      }
+    }
+
     ListTablesResponsePB::TableInfo* table = resp->add_tables();
-    table->set_id(entry.second->id());
+    table->set_id(table_info->id());
     table->set_name(ltm.data().name());
   }
 
@@ -3672,7 +3834,7 @@ Status CatalogManager::ProcessTabletReport(
       // where that might be possible (tablet creation timeout & replacement).
       rpcs.emplace_back(new AsyncDeleteReplica(
           master_, ts_desc->permanent_uuid(), table, tablet_id,
-          TABLET_DATA_DELETED, boost::none, msg));
+          TABLET_DATA_DELETED, none, msg));
       continue;
     }
 
@@ -4033,7 +4195,7 @@ void CatalogManager::SendDeleteTabletRequest(const scoped_refptr<TabletInfo>& ta
   for (const auto& peer : cstate.committed_config().peers()) {
     AsyncDeleteReplica* call = new AsyncDeleteReplica(
         master_, peer.permanent_uuid(), tablet->table(), tablet->id(),
-        TABLET_DATA_DELETED, boost::none, deletion_msg);
+        TABLET_DATA_DELETED, none, deletion_msg);
     tablet->table()->AddTask(call);
     WARN_NOT_OK(call->Run(), Substitute(
         "Failed to send DeleteReplica request for tablet $0", tablet->id()));
@@ -4490,7 +4652,8 @@ Status CatalogManager::BuildLocationsForTablet(
 
 Status CatalogManager::GetTabletLocations(const string& tablet_id,
                                           master::ReplicaTypeFilter filter,
-                                          TabletLocationsPB* locs_pb) {
+                                          TabletLocationsPB* locs_pb,
+                                          optional<const string&> user) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
@@ -4498,10 +4661,19 @@ Status CatalogManager::GetTabletLocations(const string& tablet_id,
   scoped_refptr<TabletInfo> tablet_info;
   {
     shared_lock<LockType> l(lock_);
+    // It's OK to return NOT_FOUND back to the client, even with authorization enabled,
+    // because tablet IDs are randomly generated and don't carry user data.
     if (!FindCopy(tablet_map_, tablet_id, &tablet_info)) {
       return Status::NotFound(Substitute("Unknown tablet $0", tablet_id));
     }
   }
+  if (user) {
+    // Acquire the table lock and then check that the user is authorized to operate on
+    // the table that the tablet belongs to.
+    TableMetadataLock table_lock(tablet_info->table().get(), LockMode::READ);
+    RETURN_NOT_OK(authz_provider_->AuthorizeGetTableMetadata(
+        NormalizeTableName(table_lock.data().name()), *user));
+  }
 
   return BuildLocationsForTablet(tablet_info, filter, locs_pb);
 }
@@ -4591,7 +4763,8 @@ Status CatalogManager::ReplaceTablet(const string& tablet_id, ReplaceTabletRespo
 }
 
 Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
-                                         GetTableLocationsResponsePB* resp) {
+                                         GetTableLocationsResponsePB* resp,
+                                         optional<const string&> user) {
   // If start-key is > end-key report an error instead of swapping the two
   // since probably there is something wrong app-side.
   if (req->has_partition_key_start() && req->has_partition_key_end()
@@ -4605,10 +4778,16 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  // Lookup the table and verify if it exists
+  // Lookup the table, verify if it exists, and then check that
+  // the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::READ, &table, &l));
+  auto authz_func = [&] (const string& username, const string& table_name) {
+    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, username),
+                      resp, MasterErrorPB::NOT_AUTHORIZED);
+  };
+  RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, authz_func, user,
+                                          &table, &l));
   RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
   vector<scoped_refptr<TabletInfo>> tablets_in_range;
@@ -4728,6 +4907,7 @@ template<typename RespClass>
 Status CatalogManager::WaitForNotificationLogListenerCatchUp(RespClass* resp,
                                                              rpc::RpcContext* rpc) {
   if (hms_catalog_) {
+    CHECK(rpc);
     Status s = hms_notification_log_listener_->WaitForCatchUp(rpc->GetClientDeadline());
     // ServiceUnavailable indicates the master has lost leadership.
     MasterErrorPB::Code code = s.IsServiceUnavailable() ?
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index c9ee5b7..88a0227 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -53,7 +53,7 @@
 
 namespace kudu {
 
-class AuthzTokenTest_TestSingleMasterUnavailable_Test;;
+class AuthzTokenTest_TestSingleMasterUnavailable_Test;
 class CreateTableStressTest_TestConcurrentCreateTableAndReloadMetadata_Test;
 class MonitoredTask;
 class NodeInstancePB;
@@ -68,6 +68,15 @@ namespace client {
 class ServiceUnavailableRetryClientTest_CreateTable_Test;
 } // namespace client
 
+namespace consensus {
+class RaftConsensus;
+class StartTabletCopyRequestPB;
+} // namespace consensus
+
+namespace hms {
+class HmsCatalog;
+} // namespace hms
+
 namespace rpc {
 class RpcContext;
 } // namespace rpc
@@ -78,19 +87,10 @@ class PrivateKey;
 class TokenSigningPublicKeyPB;
 } // namespace security
 
-namespace consensus {
-class RaftConsensus;
-class StartTabletCopyRequestPB;
-} // namespace consensus
-
 namespace tablet {
 class TabletReplica;
 } // namespace tablet
 
-namespace hms {
-class HmsCatalog;
-} // namespace hms
-
 namespace master {
 
 class AuthzProvider;
@@ -526,7 +526,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   void Shutdown();
   Status CheckOnline() const;
 
-  // Create a new Table with the specified attributes
+  // Create a new Table with the specified attributes.
   //
   // The RPC context is provided for logging/tracing purposes,
   // but this function does not itself respond to the RPC.
@@ -534,9 +534,11 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                      CreateTableResponsePB* resp,
                      rpc::RpcContext* rpc);
 
-  // Get the information about an in-progress create operation
+  // Get the information about an in-progress create operation. If 'user' is
+  // provided, checks that the user is authorized to get such information.
   Status IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
-                           IsCreateTableDoneResponsePB* resp);
+                           IsCreateTableDoneResponsePB* resp,
+                           boost::optional<const std::string&> user);
 
   // Delete the specified table in response to a DeleteTableRequest RPC.
   //
@@ -567,36 +569,46 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                         const std::string& new_table_name,
                         int64_t notification_log_event_id) WARN_UNUSED_RESULT;
 
-  // Get the information about an in-progress alter operation
-  //
-  // The RPC context is provided for logging/tracing purposes,
-  // but this function does not itself respond to the RPC.
+  // Get the information about an in-progress alter operation. If 'user' is
+  // provided, checks that the user is authorized to get such information.
   Status IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
                           IsAlterTableDoneResponsePB* resp,
-                          rpc::RpcContext* rpc);
+                          boost::optional<const std::string&> user);
 
-  // Get the information about the specified table
+  // Get the information about the specified table. If 'user' is provided,
+  // checks that the user is authorized to get such information.
   Status GetTableSchema(const GetTableSchemaRequestPB* req,
-                        GetTableSchemaResponsePB* resp);
+                        GetTableSchemaResponsePB* resp,
+                        boost::optional<const std::string&> user);
 
-  // List all the running tables
+  // List all the running tables. If 'user' is provided, checks that the user
+  // is authorized to get such information, otherwise, only list the tables that
+  // the user has permission on.
   Status ListTables(const ListTablesRequestPB* req,
-                    ListTablesResponsePB* resp);
+                    ListTablesResponsePB* resp,
+                    boost::optional<const std::string&> user);
 
-  // Lookup the tablets contained in the partition range of the request.
+  // Lookup the tablets contained in the partition range of the request. If 'user'
+  // is provided, checks that the user is authorized to get such information.
+  //
   // Returns an error if any of the tablets are not running.
   Status GetTableLocations(const GetTableLocationsRequestPB* req,
-                           GetTableLocationsResponsePB* resp);
+                           GetTableLocationsResponsePB* resp,
+                           boost::optional<const std::string&> user);
 
-  // Look up the locations of the given tablet. Adds only information on
-  // replicas which satisfy the 'filter'. The locations vector is overwritten
-  // (not appended to). If the tablet is not found, returns Status::NotFound.
+  // Look up the locations of the given tablet. If 'user' is provided, checks
+  // that the user is authorized to get such information. Adds only information
+  // on replicas which satisfy the 'filter'. The locations vector is overwritten
+  // (not appended to).
+  //
+  // If the tablet is not found, returns Status::NotFound.
   // If the tablet is not running, returns Status::ServiceUnavailable.
   // Otherwise, returns Status::OK and puts the result in 'locs_pb'.
   // This only returns tablets which are in RUNNING state.
   Status GetTabletLocations(const std::string& tablet_id,
                             master::ReplicaTypeFilter filter,
-                            TabletLocationsPB* locs_pb);
+                            TabletLocationsPB* locs_pb,
+                            boost::optional<const std::string&> user);
 
   // Replace the given tablet with a new, empty one. The replaced tablet is
   // deleted and its data is permanently lost.
@@ -709,21 +721,29 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   typedef std::unordered_map<std::string, scoped_refptr<TableInfo>> TableInfoMap;
   typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap;
 
-  // Delete the specified table in the catalog.
+  // Delete the specified table in the catalog. If 'user' is provided,
+  // checks that the user is authorized to delete the table. Otherwise,
+  // it indicates its an internal operation (originates from catalog
+  // manager).
   //
   // If a notification log event ID is provided, it will be written to the sys
   // catalog.
   Status DeleteTable(const DeleteTableRequestPB& req,
                      DeleteTableResponsePB* resp,
-                     boost::optional<int64_t> hms_notification_log_event_id) WARN_UNUSED_RESULT;
+                     boost::optional<int64_t> hms_notification_log_event_id,
+                     boost::optional<const std::string&> user) WARN_UNUSED_RESULT;
 
-  // Alter the specified table in the catalog.
+  // Alter the specified table in the catalog. If 'user' is provided,
+  // checks that the user is authorized to alter the table. Otherwise,
+  // it indicates its an internal operation (originates from catalog
+  // manager).
   //
   // If a notification log event ID is provided, it will be written to the sys
   // catalog along with the altered table metadata.
   Status AlterTable(const AlterTableRequestPB& req,
                     AlterTableResponsePB* resp,
-                    boost::optional<int64_t> hms_notification_log_event_id) WARN_UNUSED_RESULT;
+                    boost::optional<int64_t> hms_notification_log_event_id,
+                    boost::optional<const std::string&> user) WARN_UNUSED_RESULT;
 
   // Called by SysCatalog::SysCatalogStateChanged when this node
   // becomes the leader of a consensus configuration. Executes
@@ -820,22 +840,38 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                                              const PartitionPB& partition);
 
   // Builds the TabletLocationsPB for a tablet based on the provided TabletInfo
-  // and the replica type fiter specified. Populates locs_pb and returns
+  // and the replica type filter specified. Populates locs_pb and returns
   // Status::OK on success. Returns Status::ServiceUnavailable if tablet is
   // not running.
   Status BuildLocationsForTablet(const scoped_refptr<TabletInfo>& tablet,
                                  master::ReplicaTypeFilter filter,
                                  TabletLocationsPB* locs_pb);
 
-  // Looks up the table and locks it with the provided lock mode. If the table
-  // does not exist an error status is returned, and the appropriate error code
-  // is set in the response.
-  template<typename ReqClass, typename RespClass>
-  Status FindAndLockTable(const ReqClass& request,
-                          RespClass* response,
-                          LockMode lock_mode,
-                          scoped_refptr<TableInfo>* table_info,
-                          TableMetadataLock* table_lock) WARN_UNUSED_RESULT;
+  // Looks up the table, locks it with the provided lock mode, and, if 'user' is
+  // provided, checks that the user is authorized to operate on the table.
+  //
+  // We expect that our external authorization metadata store uses table name to
+  // identify the protected resource. Therefore, the authorization step happens
+  // after table locking to ensure the table name is consistent between authorization
+  // of a particular action on the table and the client-visible execution of that
+  // action. Consider the following scenario,
+  //  1. Alice has RENAME privileges on table B. Bob has DROP privileges on table B.
+  //  2. Alice tries to rename B to A, at the same time that Bob tries to drop B.
+  //  3. If we were to authorize before locking, there may be a privilege
+  //     escalation where Bob drops table A that he has no permission to drop.
+  //
+  // If the table does not exist, NOT_AUTHORIZED error status is returned and the
+  // appropriate error code is set in the response, to avoid leaking whether the
+  // table exists. One exception is when the user is authorized to operate on the
+  // table, then TABLE_NOT_FOUND error status is returned.
+  template<typename ReqClass, typename RespClass, typename F>
+  Status FindLockAndAuthorizeTable(const ReqClass& request,
+                                   RespClass* response,
+                                   LockMode lock_mode,
+                                   F authz_func,
+                                   boost::optional<const std::string&> user,
+                                   scoped_refptr<TableInfo>* table_info,
+                                   TableMetadataLock* table_lock) WARN_UNUSED_RESULT;
 
   // Extract the set of tablets that must be processed because not running yet.
   void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);
diff --git a/src/kudu/master/master-test-util.h b/src/kudu/master/master-test-util.h
index 52d8953..e440dff 100644
--- a/src/kudu/master/master-test-util.h
+++ b/src/kudu/master/master-test-util.h
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <string>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 
 #include "kudu/common/schema.h"
@@ -53,7 +54,7 @@ Status WaitForRunningTabletCount(MiniMaster* mini_master,
     {
       CatalogManager::ScopedLeaderSharedLock l(catalog);
       RETURN_NOT_OK(l.first_failed_status());
-      RETURN_NOT_OK(catalog->GetTableLocations(&req, resp));
+      RETURN_NOT_OK(catalog->GetTableLocations(&req, resp, /*user=*/boost::none));
     }
     if (resp->tablet_locations_size() >= expected_count) {
       return Status::OK();
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 5c38173..dd73a39 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -81,6 +81,9 @@ message MasterErrorPB {
 
     // An operation involving the Hive Metastore failed.
     HIVE_METASTORE_ERROR = 13;
+
+    // The caller is not authorized to perform the attempted operation.
+    NOT_AUTHORIZED = 14;
   }
 
   // The error code.
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index b8f4f4c..bcb244f 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -92,6 +92,7 @@ DEFINE_bool(master_support_authz_tokens, true,
             "testing version compatibility in the client.");
 TAG_FLAG(master_support_authz_tokens, hidden);
 
+using boost::make_optional;
 using google::protobuf::Message;
 using kudu::consensus::ReplicaManagementInfoPB;
 using kudu::pb_util::SecureDebugString;
@@ -306,7 +307,8 @@ void MasterServiceImpl::GetTabletLocations(const GetTabletLocationsRequestPB* re
     // TODO(todd): once we have catalog data. ACL checks would also go here, probably.
     TabletLocationsPB* locs_pb = resp->add_tablet_locations();
     Status s = server_->catalog_manager()->GetTabletLocations(
-        tablet_id, req->replica_type_filter(), locs_pb);
+        tablet_id, req->replica_type_filter(), locs_pb,
+        make_optional<const string&>(rpc->remote_user().username()));
     if (!s.ok()) {
       resp->mutable_tablet_locations()->RemoveLast();
 
@@ -340,7 +342,8 @@ void MasterServiceImpl::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
     return;
   }
 
-  Status s = server_->catalog_manager()->IsCreateTableDone(req, resp);
+  Status s = server_->catalog_manager()->IsCreateTableDone(
+      req, resp, make_optional<const string&>(rpc->remote_user().username()));
   CheckRespErrorOrSetUnknown(s, resp);
   rpc->RespondSuccess();
 }
@@ -379,7 +382,8 @@ void MasterServiceImpl::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
     return;
   }
 
-  Status s = server_->catalog_manager()->IsAlterTableDone(req, resp, rpc);
+  Status s = server_->catalog_manager()->IsAlterTableDone(
+      req, resp, make_optional<const string&>(rpc->remote_user().username()));
   CheckRespErrorOrSetUnknown(s, resp);
   rpc->RespondSuccess();
 }
@@ -392,7 +396,8 @@ void MasterServiceImpl::ListTables(const ListTablesRequestPB* req,
     return;
   }
 
-  Status s = server_->catalog_manager()->ListTables(req, resp);
+  Status s = server_->catalog_manager()->ListTables(
+      req, resp, make_optional<const string&>(rpc->remote_user().username()));
   CheckRespErrorOrSetUnknown(s, resp);
   rpc->RespondSuccess();
 }
@@ -411,7 +416,8 @@ void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
   if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
     SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
   }
-  Status s = server_->catalog_manager()->GetTableLocations(req, resp);
+  Status s = server_->catalog_manager()->GetTableLocations(
+      req, resp, make_optional<const string&>(rpc->remote_user().username()));
   CheckRespErrorOrSetUnknown(s, resp);
   rpc->RespondSuccess();
 }
@@ -424,7 +430,8 @@ void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req,
     return;
   }
 
-  Status s = server_->catalog_manager()->GetTableSchema(req, resp);
+  Status s = server_->catalog_manager()->GetTableSchema(
+      req, resp, make_optional<const string&>(rpc->remote_user().username()));
   CheckRespErrorOrSetUnknown(s, resp);
   if (resp->has_error()) {
     // If there was an application error, respond to the RPC.
diff --git a/src/kudu/master/sentry_authz_provider-test-base.h b/src/kudu/master/sentry_authz_provider-test-base.h
new file mode 100644
index 0000000..c60e367
--- /dev/null
+++ b/src/kudu/master/sentry_authz_provider-test-base.h
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <set>
+#include <string>
+
+#include <gflags/gflags_declare.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/sentry/mini_sentry.h"
+#include "kudu/sentry/sentry_client.h"
+#include "kudu/thrift/client.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_string(server_name);
+
+namespace kudu {
+namespace master {
+
+inline Status DropRole(sentry::SentryClient* sentry_client,
+                       const std::string& role_name) {
+  ::sentry::TDropSentryRoleRequest role_req;
+  role_req.__set_requestorUserName("test-admin");
+  role_req.__set_roleName(role_name);
+  return sentry_client->DropRole(role_req);
+}
+
+inline Status CreateRoleAndAddToGroups(sentry::SentryClient* sentry_client,
+                                       const std::string& role_name,
+                                       const std::string& group_name) {
+  ::sentry::TCreateSentryRoleRequest role_req;
+  role_req.__set_requestorUserName("test-admin");
+  role_req.__set_roleName(role_name);
+  RETURN_NOT_OK(sentry_client->CreateRole(role_req));
+
+  ::sentry::TSentryGroup group;
+  group.groupName = group_name;
+  std::set<::sentry::TSentryGroup> groups;
+  groups.insert(group);
+  ::sentry::TAlterSentryRoleAddGroupsRequest group_request;
+  ::sentry::TAlterSentryRoleAddGroupsResponse group_response;
+  group_request.__set_requestorUserName("test-admin");
+  group_request.__set_roleName(role_name);
+  group_request.__set_groups(groups);
+  return sentry_client->AlterRoleAddGroups(group_request, &group_response);
+}
+
+inline Status AlterRoleGrantPrivilege(sentry::SentryClient* sentry_client,
+                                      const std::string& role_name,
+                                      const ::sentry::TSentryPrivilege& privilege) {
+  ::sentry::TAlterSentryRoleGrantPrivilegeRequest privilege_request;
+  ::sentry::TAlterSentryRoleGrantPrivilegeResponse privilege_response;
+  privilege_request.__set_requestorUserName("test-admin");
+  privilege_request.__set_roleName(role_name);
+  privilege_request.__set_privilege(privilege);
+  return sentry_client->AlterRoleGrantPrivilege(privilege_request, &privilege_response);
+}
+
+// Returns a server level TSentryPrivilege with the server name, action
+// and grant option.
+inline ::sentry::TSentryPrivilege GetServerPrivilege(
+    const std::string& action,
+    const ::sentry::TSentryGrantOption::type& grant_option =
+        ::sentry::TSentryGrantOption::DISABLED) {
+  ::sentry::TSentryPrivilege privilege;
+  privilege.__set_privilegeScope("SERVER");
+  privilege.__set_serverName(FLAGS_server_name);
+  privilege.__set_action(action);
+  privilege.__set_grantOption(grant_option);
+  return privilege;
+}
+
+// Returns a database level TSentryPrivilege with the given database name, action
+// and grant option.
+inline ::sentry::TSentryPrivilege GetDatabasePrivilege(
+    const std::string& db_name,
+    const std::string& action,
+    const ::sentry::TSentryGrantOption::type& grant_option =
+        ::sentry::TSentryGrantOption::DISABLED) {
+  ::sentry::TSentryPrivilege privilege = GetServerPrivilege(action, grant_option);
+  privilege.__set_privilegeScope("DATABASE");
+  privilege.__set_dbName(db_name);
+  return privilege;
+}
+
+// Returns a table level TSentryPrivilege with the given table name, database name,
+// action and grant option.
+inline ::sentry::TSentryPrivilege GetTablePrivilege(
+    const std::string& db_name,
+    const std::string& table_name,
+    const std::string& action,
+    const ::sentry::TSentryGrantOption::type& grant_option =
+        ::sentry::TSentryGrantOption::DISABLED) {
+  ::sentry::TSentryPrivilege privilege = GetDatabasePrivilege(db_name, action, grant_option);
+  privilege.__set_privilegeScope("TABLE");
+  privilege.__set_tableName(table_name);
+  return privilege;
+}
+
+// Returns a column level TSentryPrivilege with the given column name, table name,
+// database name, action and grant option.
+inline ::sentry::TSentryPrivilege GetColumnPrivilege(
+    const std::string& db_name,
+    const std::string& table_name,
+    const std::string& column_name,
+    const std::string& action,
+    const ::sentry::TSentryGrantOption::type& grant_option =
+        ::sentry::TSentryGrantOption::DISABLED) {
+  ::sentry::TSentryPrivilege privilege = GetTablePrivilege(db_name, table_name,
+                                                 action, grant_option);
+  privilege.__set_privilegeScope("COLUMN");
+  privilege.__set_columnName(column_name);
+  return privilege;
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/sentry_authz_provider-test.cc b/src/kudu/master/sentry_authz_provider-test.cc
index 23e1c2f..35199a6 100644
--- a/src/kudu/master/sentry_authz_provider-test.cc
+++ b/src/kudu/master/sentry_authz_provider-test.cc
@@ -18,7 +18,6 @@
 #include "kudu/master/sentry_authz_provider.h"
 
 #include <memory>
-#include <set>
 #include <string>
 #include <vector>
 
@@ -27,6 +26,7 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/sentry_authz_provider-test-base.h"
 #include "kudu/sentry/mini_sentry.h"
 #include "kudu/sentry/sentry-test-base.h"
 #include "kudu/sentry/sentry_action.h"
@@ -43,19 +43,12 @@ DECLARE_int32(sentry_service_send_timeout_seconds);
 DECLARE_string(sentry_service_rpc_addresses);
 DECLARE_string(sentry_service_security_mode);
 DECLARE_string(server_name);
+DECLARE_string(trusted_user_acl);
 
-using sentry::TAlterSentryRoleAddGroupsRequest;
-using sentry::TAlterSentryRoleAddGroupsResponse;
-using sentry::TAlterSentryRoleGrantPrivilegeRequest;
-using sentry::TAlterSentryRoleGrantPrivilegeResponse;
-using sentry::TCreateSentryRoleRequest;
-using sentry::TDropSentryRoleRequest;
 using sentry::TSentryGrantOption;
-using sentry::TSentryGroup;
 using sentry::TSentryPrivilege;
 using std::tuple;
 using std::unique_ptr;
-using std::set;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -68,9 +61,17 @@ using sentry::SentryAuthorizableScope;
 
 namespace master {
 
+TEST(SentryAuthzProviderStaticTest, TestTrustedUserAcl) {
+  FLAGS_trusted_user_acl = "impala,hive,hdfs";
+  SentryAuthzProvider authz_provider;
+  ASSERT_TRUE(authz_provider.IsTrustedUser("impala"));
+  ASSERT_TRUE(authz_provider.IsTrustedUser("hive"));
+  ASSERT_TRUE(authz_provider.IsTrustedUser("hdfs"));
+  ASSERT_FALSE(authz_provider.IsTrustedUser("untrusted"));
+}
+
 class SentryAuthzProviderTest : public SentryTestBase {
  public:
-  const char* const kAdminUser = "test-admin";
   const char* const kTestUser = "test-user";
   const char* const kUserGroup = "user";
   const char* const kRoleName = "developer";
@@ -97,93 +98,6 @@ class SentryAuthzProviderTest : public SentryTestBase {
     return Status::OK();
   }
 
-  Status DropRole(const string& role_name) {
-    TDropSentryRoleRequest role_req;
-    role_req.__set_requestorUserName(kAdminUser);
-    role_req.__set_roleName(role_name);
-    return sentry_client_->DropRole(role_req);
-  }
-
-  Status CreateRoleAndAddToGroups(const string& role_name, const string& group_name) {
-    TCreateSentryRoleRequest role_req;
-    role_req.__set_requestorUserName(kAdminUser);
-    role_req.__set_roleName(role_name);
-    RETURN_NOT_OK(sentry_client_->CreateRole(role_req));
-
-    TSentryGroup group;
-    group.groupName = group_name;
-    set<TSentryGroup> groups;
-    groups.insert(group);
-    TAlterSentryRoleAddGroupsRequest group_request;
-    TAlterSentryRoleAddGroupsResponse group_response;
-    group_request.__set_requestorUserName(kAdminUser);
-    group_request.__set_roleName(role_name);
-    group_request.__set_groups(groups);
-    return sentry_client_->AlterRoleAddGroups(group_request, &group_response);
-  }
-
-  Status AlterRoleGrantPrivilege(const string& role_name, const TSentryPrivilege& privilege) {
-    TAlterSentryRoleGrantPrivilegeRequest privilege_request;
-    TAlterSentryRoleGrantPrivilegeResponse privilege_response;
-    privilege_request.__set_requestorUserName(kAdminUser);
-    privilege_request.__set_roleName(role_name);
-    privilege_request.__set_privilege(privilege);
-    return sentry_client_->AlterRoleGrantPrivilege(privilege_request, &privilege_response);
-  }
-
-  // Returns a server level TSentryPrivilege with the server name, action
-  // and grant option.
-  TSentryPrivilege GetServerPrivilege(
-      const string& action,
-      TSentryGrantOption::type grant_option = TSentryGrantOption::DISABLED) {
-    TSentryPrivilege privilege;
-    privilege.__set_privilegeScope("SERVER");
-    privilege.__set_serverName(FLAGS_server_name);
-    privilege.__set_action(action);
-    privilege.__set_grantOption(grant_option);
-    return privilege;
-  }
-
-  // Returns a database level TSentryPrivilege with the given database name, action
-  // and grant option.
-  TSentryPrivilege GetDatabasePrivilege(
-      const string& db_name,
-      const string& action,
-      const TSentryGrantOption::type& grant_option = TSentryGrantOption::DISABLED) {
-    TSentryPrivilege privilege = GetServerPrivilege(action, grant_option);
-    privilege.__set_privilegeScope("DATABASE");
-    privilege.__set_dbName(db_name);
-    return privilege;
-  }
-
-  // Returns a table level TSentryPrivilege with the given table name, database name,
-  // action and grant option.
-  TSentryPrivilege GetTablePrivilege(
-      const string& db_name,
-      const string& table_name,
-      const string& action,
-      const TSentryGrantOption::type& grant_option = TSentryGrantOption::DISABLED) {
-    TSentryPrivilege privilege = GetDatabasePrivilege(db_name, action, grant_option);
-    privilege.__set_privilegeScope("TABLE");
-    privilege.__set_tableName(table_name);
-    return privilege;
-  }
-
-  // Returns a column level TSentryPrivilege with the given column name, table name,
-  // database name, action and grant option.
-  TSentryPrivilege GetColumnPrivilege(
-      const string& db_name,
-      const string& table_name,
-      const string& column_name,
-      const string& action,
-      const TSentryGrantOption::type& grant_option = TSentryGrantOption::DISABLED) {
-    TSentryPrivilege privilege = GetTablePrivilege(db_name, table_name,
-                                                   action, grant_option);
-    privilege.__set_privilegeScope("COLUMN");
-    privilege.__set_columnName(column_name);
-    return privilege;
-  }
-
  protected:
   unique_ptr<SentryAuthzProvider> sentry_authz_provider_;
 };
@@ -212,15 +126,15 @@ TEST_P(TestTableAuthorization, TestAuthorizeCreateTable) {
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Don't authorize create table on a user without required privileges.
-  ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "DROP");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   s = sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Authorize create table on a user with proper privileges.
   privilege = GetDatabasePrivilege("db", "CREATE");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, kTestUser));
 
   // Table creation with a different owner than the user
@@ -231,35 +145,35 @@ TEST_P(TestTableAuthorization, TestAuthorizeCreateTable) {
   s = sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, "diff-user");
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
   privilege = GetDatabasePrivilege("db", "ALL", TSentryGrantOption::ENABLED);
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, "diff-user"));
 }
 
 TEST_P(TestTableAuthorization, TestAuthorizeDropTable) {
   // Don't authorize delete table on a user without required privileges.
-  ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "SELECT");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   Status s = sentry_authz_provider_->AuthorizeDropTable("db.table", kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Authorize delete table on a user with proper privileges.
   privilege = GetDatabasePrivilege("db", "DROP");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeDropTable("db.table", kTestUser));
 }
 
 TEST_P(TestTableAuthorization, TestAuthorizeAlterTable) {
   // Don't authorize alter table on a user without required privileges.
-  ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
   TSentryPrivilege db_privilege = GetDatabasePrivilege("db", "SELECT");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, db_privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, db_privilege));
   Status s = sentry_authz_provider_->AuthorizeAlterTable("db.table", "db.table", kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Authorize alter table without rename on a user with proper privileges.
   db_privilege = GetDatabasePrivilege("db", "ALTER");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, db_privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, db_privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeAlterTable("db.table", "db.table", kTestUser));
 
   // Table alteration with rename requires 'ALL ON TABLE <old-table>' and
@@ -269,9 +183,9 @@ TEST_P(TestTableAuthorization, TestAuthorizeAlterTable) {
 
   // Authorize alter table without rename on a user with proper privileges.
   db_privilege = GetDatabasePrivilege("new_db", "CREATE");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, db_privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, db_privilege));
   TSentryPrivilege table_privilege = GetTablePrivilege("db", "table", "ALL");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, table_privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, table_privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeAlterTable("db.table",
                                                         "new_db.new_table",
                                                         kTestUser));
@@ -279,13 +193,13 @@ TEST_P(TestTableAuthorization, TestAuthorizeAlterTable) {
 
 TEST_P(TestTableAuthorization, TestAuthorizeGetTableMetadata) {
   // Don't authorize delete table on a user without required privileges.
-  ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
   Status s = sentry_authz_provider_->AuthorizeGetTableMetadata("db.table", kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Authorize delete table on a user with proper privileges.
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "SELECT");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeGetTableMetadata("db.table", kTestUser));
 }
 
@@ -302,9 +216,9 @@ TEST_P(TestTableAuthorization, TestReconnect) {
   sentry_authz_provider_.reset(new SentryAuthzProvider());
   ASSERT_OK(sentry_authz_provider_->Start());
 
-  ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "METADATA");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeGetTableMetadata("db.table", kTestUser));
 
   // Shutdown Sentry and try a few operations.
@@ -324,7 +238,7 @@ TEST_P(TestTableAuthorization, TestReconnect) {
   });
 
   privilege = GetDatabasePrivilege("db", "DROP");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeDropTable("db.table", kTestUser));
 
   // Pause Sentry and try a few operations.
@@ -345,19 +259,19 @@ TEST_P(TestTableAuthorization, TestReconnect) {
 }
 
 TEST_P(TestTableAuthorization, TestInvalidAction) {
-  ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "invalid");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   // User has privileges with invalid action cannot operate on the table.
   Status s = sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 }
 
 TEST_P(TestTableAuthorization, TestInvalidAuthzScope) {
-  ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "ALL");
   privilege.__set_privilegeScope("invalid");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   // User has privileges with invalid authorizable scope cannot operate
   // on the table.
   Status s = sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser);
@@ -366,9 +280,9 @@ TEST_P(TestTableAuthorization, TestInvalidAuthzScope) {
 
 // Ensures Sentry privileges are case insensitive.
 TEST_P(TestTableAuthorization, TestPrivilegeCaseSensitivity) {
-  ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "create");
-  ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser));
 }
 
@@ -427,22 +341,22 @@ TEST_P(TestAuthzHierarchy, TestAuthorizableScope) {
   // Privilege with higher scope on the hierarchy can imply privileges
   // with lower scope on the hierarchy.
   for (const auto& privilege : higher_hierarchy_privs) {
-    ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
-    ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+    ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
     ASSERT_OK(sentry_authz_provider_->Authorize(scope, SentryAction::Action::ALL,
                                                 Substitute("$0.$1", db, tbl), kTestUser));
-    ASSERT_OK(DropRole(kRoleName));
+    ASSERT_OK(DropRole(sentry_client_.get(), kRoleName));
   }
 
   // Privilege with lower scope on the hierarchy cannot imply privileges
   // with higher scope on the hierarchy.
   for (const auto& privilege : lower_hierarchy_privs) {
-    ASSERT_OK(CreateRoleAndAddToGroups(kRoleName, kUserGroup));
-    ASSERT_OK(AlterRoleGrantPrivilege(kRoleName, privilege));
+    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+    ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
     Status s = sentry_authz_provider_->Authorize(scope, SentryAction::Action::ALL,
                                                  Substitute("$0.$1", db, tbl), kTestUser);
     ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-    ASSERT_OK(DropRole(kRoleName));
+    ASSERT_OK(DropRole(sentry_client_.get(), kRoleName));
   }
 }
 
diff --git a/src/kudu/master/sentry_authz_provider.cc b/src/kudu/master/sentry_authz_provider.cc
index 2c5674b..7d1a3be 100644
--- a/src/kudu/master/sentry_authz_provider.cc
+++ b/src/kudu/master/sentry_authz_provider.cc
@@ -268,6 +268,10 @@ Status SentryAuthzProvider::Authorize(SentryAuthorizableScope::Scope scope,
                                       const string& user,
                                       bool require_grant_option) {
 
+  if (AuthzProvider::IsTrustedUser(user)) {
+    return Status::OK();
+  }
+
   TSentryAuthorizable authorizable;
   RETURN_NOT_OK(GetAuthorizable(table_ident, scope, &authorizable));
 
diff --git a/src/kudu/master/sentry_authz_provider.h b/src/kudu/master/sentry_authz_provider.h
index e0f79c3..b9e290d 100644
--- a/src/kudu/master/sentry_authz_provider.h
+++ b/src/kudu/master/sentry_authz_provider.h
@@ -74,9 +74,6 @@ class SentryAuthzProvider : public AuthzProvider {
   Status AuthorizeGetTableMetadata(const std::string& table_name,
                                    const std::string& user) override WARN_UNUSED_RESULT;
 
-  // Validates the sentry_service_rpc_addresses gflag.
-  static bool ValidateAddresses(const char* flag_name, const std::string& addresses);
-
  private:
   FRIEND_TEST(TestAuthzHierarchy, TestAuthorizableScope);
 
@@ -86,6 +83,8 @@ class SentryAuthzProvider : public AuthzProvider {
   // 'TABLE' scope.
   //
   // If the operation is not authorized, returns Status::NotAuthorized().
+  // Note that the authorization process is case insensitive for the
+  // authorizables.
   Status Authorize(sentry::SentryAuthorizableScope::Scope scope,
                    sentry::SentryAction::Action action,
                    const std::string& table_ident,
diff --git a/src/kudu/sentry/mini_sentry.cc b/src/kudu/sentry/mini_sentry.cc
index 2380d4f..e122c14 100644
--- a/src/kudu/sentry/mini_sentry.cc
+++ b/src/kudu/sentry/mini_sentry.cc
@@ -204,6 +204,12 @@ Status MiniSentry::CreateSentryConfigs(const string& tmp_dir) const {
   //
   // - sentry.service.server.rpc-address
   //     IP address that the Sentry service starts with.
+  //
+  // - sentry.db.policy.store.owner.as.privilege
+  //    Configures Sentry to enable owner privileges feature which automatically
+  //    derives OWNER/ALL privileges from object's ownership. 'all' indicates an
+  //    object owner has OWNER/ALL privilege on the object, but cannot transfer
+  //    owner privileges to another user or role.
   static const string kFileTemplate = R"(
 <configuration>
 
@@ -272,6 +278,11 @@ Status MiniSentry::CreateSentryConfigs(const string& tmp_dir) const {
     <value>kudu,hive</value>
   </property>
 
+  <property>
+    <name>sentry.db.policy.store.owner.as.privilege</name>
+    <value>all</value>
+  </property>
+
 </configuration>
   )";
 
diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc
index 7c89418..4e432ed 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -31,6 +31,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -72,6 +73,7 @@ class Schema;
 DECLARE_int32(num_replicas);
 DECLARE_int32(num_tablet_servers);
 
+using boost::none;
 using kudu::client::KuduClient;
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduSchema;
@@ -1290,6 +1292,7 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) {
     ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(),
                                        table_name, MonoDelta::FromSeconds(30),
                                        master::ANY_REPLICA,
+                                       /*table_id=*/none,
                                        &table_locations));
     const auto tablet_num = table_locations.tablet_locations_size();
     auto total_table_replica_count = 0;


Mime
View raw message