kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] 02/02: KUDU-2921: Exposing the table statistics to spark relation.
Date Tue, 10 Sep 2019 03:54:45 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e233e7c127db1f593be1b0f477267c060f7dd6f3
Author: triplesheep <triplesheep0419@gmail.com>
AuthorDate: Tue Aug 20 15:05:12 2019 +0800

    KUDU-2921: Exposing the table statistics to spark relation.
    
    Exposing current table statistics to spark via rpc from client to master.
    
    Change-Id: I7742a76708f989b0ccc8ba417f3390013e260175
    Reviewed-on: http://gerrit.cloudera.org:8080/14107
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Adar Dembo <adar@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 16 +++++
 .../kudu/client/GetTableStatisticsRequest.java     | 76 ++++++++++++++++++++++
 .../kudu/client/GetTableStatisticsResponse.java    | 59 +++++++++++++++++
 .../java/org/apache/kudu/client/KuduClient.java    | 11 ++++
 .../java/org/apache/kudu/client/KuduTable.java     |  8 +++
 .../apache/kudu/client/KuduTableStatistics.java    | 58 +++++++++++++++++
 .../java/org/apache/kudu/client/TestKuduTable.java | 38 +++++++++++
 .../org/apache/kudu/spark/kudu/DefaultSource.scala | 26 ++++++++
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 73 +++++++++++++++++++++
 src/kudu/master/authz_provider.h                   |  8 +++
 src/kudu/master/catalog_manager.cc                 | 42 ++++++++++++
 src/kudu/master/catalog_manager.h                  |  6 ++
 src/kudu/master/default_authz_provider.h           |  5 ++
 src/kudu/master/master-test.cc                     | 28 ++++++++
 src/kudu/master/master.proto                       | 18 +++++
 src/kudu/master/master_service.cc                  | 13 ++++
 src/kudu/master/master_service.h                   |  6 ++
 src/kudu/master/sentry_authz_provider-test.cc      | 13 ++++
 src/kudu/master/sentry_authz_provider.cc           |  9 +++
 src/kudu/master/sentry_authz_provider.h            |  3 +
 20 files changed, 516 insertions(+)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 4482356..ba3824a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -808,6 +808,22 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Get table's statistics from master.
+   * @param name the table's name
+   * @return an deferred KuduTableStatistics
+   */
+  public Deferred<KuduTableStatistics> getTableStatistics(String name) {
+    GetTableStatisticsRequest rpc = new GetTableStatisticsRequest(this.masterTable,
+                                                                  name,
+                                                                  timer,
+                                                                  defaultAdminOperationTimeoutMs);
+
+    return sendRpcToTablet(rpc).addCallback(resp -> {
+      return new KuduTableStatistics(resp.getOnDiskSize(), resp.getLiveRowCount());
+    });
+  }
+
+  /**
    * Test if a table exists.
    * @param name a non-null table name
    * @return true if the table exists, else false
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
new file mode 100644
index 0000000..e541f39
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import com.google.protobuf.Message;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.jboss.netty.util.Timer;
+
+import org.apache.kudu.master.Master;
+import org.apache.kudu.util.Pair;
+
+@InterfaceAudience.Private
+class GetTableStatisticsRequest extends KuduRpc<GetTableStatisticsResponse> {
+
+  static final String GET_TABLE_STATISTICS = "GetTableStatistics";
+
+  private final String name;
+
+  GetTableStatisticsRequest(KuduTable table,
+                            String name,
+                            Timer timer,
+                            long timeoutMillis) {
+    super(table, timer, timeoutMillis);
+    this.name = name;
+  }
+
+  @Override
+  Message createRequestPB() {
+    final Master.GetTableStatisticsRequestPB.Builder builder =
+        Master.GetTableStatisticsRequestPB.newBuilder();
+    Master.TableIdentifierPB tableID =
+        Master.TableIdentifierPB.newBuilder().setTableName(name).build();
+    builder.setTable(tableID);
+    return builder.build();
+  }
+
+  @Override
+  String serviceName() {
+    return MASTER_SERVICE_NAME;
+  }
+
+  @Override
+  String method() {
+    return GET_TABLE_STATISTICS;
+  }
+
+  @Override
+  Pair<GetTableStatisticsResponse, Object> deserialize(CallResponse callResponse,
+                                                       String tsUUID) throws KuduException
{
+    final Master.GetTableStatisticsResponsePB.Builder respBuilder =
+        Master.GetTableStatisticsResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), respBuilder);
+    GetTableStatisticsResponse response = new GetTableStatisticsResponse(
+        timeoutTracker.getElapsedMillis(),
+        tsUUID,
+        respBuilder.getOnDiskSize(),
+        respBuilder.getLiveRowCount());
+    return new Pair<GetTableStatisticsResponse, Object>(
+        response, respBuilder.hasError() ? respBuilder.getError() : null);
+  }
+}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsResponse.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsResponse.java
new file mode 100644
index 0000000..92f46f5
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsResponse.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class GetTableStatisticsResponse extends KuduRpcResponse {
+
+  private final long onDiskSize;
+  private final long liveRowCount;
+
+
+  /**
+   * @param elapsedMillis Time in milliseconds since RPC creation to now
+   * @param tsUUID the UUID of the tablet server that sent the response
+   * @param onDiskSize the table's on disk size
+   * @param liveRowCount the table's live row count
+   */
+  GetTableStatisticsResponse(long elapsedMillis,
+                             String tsUUID,
+                             long onDiskSize,
+                             long liveRowCount) {
+    super(elapsedMillis, tsUUID);
+    this.onDiskSize = onDiskSize;
+    this.liveRowCount = liveRowCount;
+  }
+
+  /**
+   * Get the table's on disk size, this statistic is pre-replication.
+   * @return Table's on disk size
+   */
+  public long getOnDiskSize() {
+    return onDiskSize;
+  }
+
+  /**
+   * Get the table's live row count, this statistic is pre-replication.
+   * @return Table's live row count
+   */
+  public long getLiveRowCount() {
+    return liveRowCount;
+  }
+}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index acb2b69..4e3a705 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -219,6 +219,17 @@ public class KuduClient implements AutoCloseable {
   }
 
   /**
+   * Get table's statistics from master.
+   * @param name the table's name
+   * @return the statistics of table
+   * @throws KuduException if anything went wrong
+   */
+  public KuduTableStatistics getTableStatistics(String name) throws KuduException {
+    Deferred<KuduTableStatistics> d = asyncClient.getTableStatistics(name);
+    return joinAndHandleException(d);
+  }
+
+  /**
    * Test if a table exists.
    * @param name a non-null table name
    * @return true if the table exists, else false
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
index 5acb72d..29aa4a7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java
@@ -281,4 +281,12 @@ public class KuduTable {
     }
     return rangePartitions;
   }
+
+  /**
+   * Get this table's statistics.
+   * @return this table's statistics
+   */
+  public KuduTableStatistics getTableStatistics() throws KuduException {
+    return client.syncClient().getTableStatistics(name);
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTableStatistics.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTableStatistics.java
new file mode 100644
index 0000000..3ebbae9
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTableStatistics.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Represent statistics belongs to a specific kudu table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduTableStatistics {
+
+  private final long onDiskSize;
+  private final long liveRowCount;
+
+  /**
+   * @param onDiskSize the table's on disk size
+   * @param liveRowCount the table's live row count
+   */
+  KuduTableStatistics(long onDiskSize,
+                      long liveRowCount) {
+    this.onDiskSize = onDiskSize;
+    this.liveRowCount = liveRowCount;
+  }
+
+  /**
+   * Get the table's on disk size, this statistic is pre-replication.
+   * @return Table's on disk size
+   */
+  public long getOnDiskSize() {
+    return onDiskSize;
+  }
+
+  /**
+   * Get the table's live row count, this statistic is pre-replication.
+   * @return Table's live row count
+   */
+  public long getLiveRowCount() {
+    return liveRowCount;
+  }
+}
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
index 064b3cc..43758f4 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
@@ -779,4 +779,42 @@ public class TestKuduTable {
     assertEquals(9, dimensionMap.get("labelA").intValue());
     assertEquals(3, dimensionMap.get("labelB").intValue());
   }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.TabletServerConfig(flags = {
+      "--update_tablet_stats_interval_ms=200",
+      "--heartbeat_interval_ms=100",
+  })
+  public void testGetTableStatistics() throws Exception {
+    // Create a table.
+    CreateTableOptions builder = getBasicCreateTableOptions();
+    KuduTable table = client.createTable(tableName, BASIC_SCHEMA, builder);
+
+    // Insert some rows and test the statistics.
+    KuduTableStatistics prevStatistics = new KuduTableStatistics(-1, -1);
+    KuduTableStatistics currentStatistics = new KuduTableStatistics(-1, -1);
+    KuduSession session = client.newSession();
+    int num = 100;
+    for (int i = 0; i < num; ++i) {
+      // Get current table statistics.
+      currentStatistics = table.getTableStatistics();
+      assertTrue(currentStatistics.getOnDiskSize() >= prevStatistics.getOnDiskSize());
+      assertTrue(currentStatistics.getLiveRowCount() >= prevStatistics.getLiveRowCount());
+      assertTrue(currentStatistics.getLiveRowCount() <= i + 1);
+      prevStatistics = currentStatistics;
+      // Insert row.
+      Insert insert = createBasicSchemaInsert(table, i);
+      session.apply(insert);
+      List<String> rows = scanTableToStrings(table);
+      assertEquals("wrong number of rows", i + 1, rows.size());
+    }
+
+    // Final accuracy test.
+    // Wait for master to aggregate table statistics.
+    Thread.sleep(200 * 6);
+    currentStatistics = table.getTableStatistics();
+    assertTrue(currentStatistics.getOnDiskSize() >= prevStatistics.getOnDiskSize());
+    assertTrue(currentStatistics.getLiveRowCount() >= prevStatistics.getLiveRowCount());
+    assertTrue(currentStatistics.getLiveRowCount() == num);
+  }
 }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index fcbafc2..74da812 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.SaveMode
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.apache.kudu.client.KuduPredicate.ComparisonOp
 import org.apache.kudu.client._
 import org.apache.kudu.spark.kudu.KuduReadOptions._
@@ -254,12 +256,36 @@ class KuduRelation(
     val readOptions: KuduReadOptions = new KuduReadOptions,
     val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext)
     extends BaseRelation with PrunedFilteredScan with InsertableRelation {
+  val log: Logger = LoggerFactory.getLogger(getClass)
 
   private val context: KuduContext =
     new KuduContext(masterAddrs, sqlContext.sparkContext)
 
   private val table: KuduTable = context.syncClient.openTable(tableName)
 
+  private val estimatedSize: Long = {
+    try {
+      table.getTableStatistics().getOnDiskSize
+    } catch {
+      case e: Exception =>
+        log.warn("Error while getting table statistic from master, maybe the current" +
+          " master doesn't support the rpc, please check the version.", e)
+        super.sizeInBytes
+    }
+  }
+
+  /**
+   * Estimated size of this relation in bytes, this information is used by spark to
+   * decide whether it is safe to broadcast a relation such as in join selection. It
+   * is always better to overestimate this size than underestimate, because underestimation
+   * may lead to expensive execution plan such as broadcasting a very large table which
+   * will cause great network bandwidth consumption.
+   * TODO(KUDU-2933): Consider projection and predicates in size estimation.
+   *
+   * @return size of this relation in bytes
+   */
+  override def sizeInBytes: Long = estimatedSize
+
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
     filters.filterNot(KuduRelation.supportsFilter)
 
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 4c7e04d..7dc96d2 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -34,8 +34,10 @@ import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
+import org.apache.kudu.test.KuduTestHarness.MasterServerConfig
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.junit.Before
 import org.junit.Test
 
@@ -1053,4 +1055,75 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     }
     assert(actualNumTasks > 2)
   }
+
+  @Test
+  @MasterServerConfig(
+    flags = Array(
+      "--mock_table_metrics_for_testing=true",
+      "--on_disk_size_for_testing=1024",
+      "--live_row_count_for_testing=100"
+    ))
+  def testGetTableStatistics(): Unit = {
+    val dataFrame = sqlContext.read.options(kuduOptions).format("kudu").load
+    val kuduRelation = kuduRelationFromDataFrame(dataFrame)
+    assert(kuduRelation.sizeInBytes == 1024)
+  }
+
+  @Test
+  @MasterServerConfig(
+    flags = Array(
+      "--mock_table_metrics_for_testing=true",
+      "--on_disk_size_for_testing=1024",
+      "--live_row_count_for_testing=100"
+    ))
+  def testJoinWithTableStatistics(): Unit = {
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+
+    // 1. Create two tables.
+    val table1 = "table1"
+    kuduContext.createTable(
+      table1,
+      df.schema,
+      Seq("key"),
+      new CreateTableOptions()
+        .setRangePartitionColumns(List("key").asJava)
+        .setNumReplicas(1))
+    var options1: Map[String, String] =
+      Map("kudu.table" -> table1, "kudu.master" -> harness.getMasterAddressesAsString)
+    df.write.options(options1).mode("append").format("kudu").save
+    val df1 = sqlContext.read.options(options1).format("kudu").load
+    df1.createOrReplaceTempView(table1)
+
+    val table2 = "table2"
+    kuduContext.createTable(
+      table2,
+      df.schema,
+      Seq("key"),
+      new CreateTableOptions()
+        .setRangePartitionColumns(List("key").asJava)
+        .setNumReplicas(1))
+    var options2: Map[String, String] =
+      Map("kudu.table" -> table2, "kudu.master" -> harness.getMasterAddressesAsString)
+    df.write.options(options2).mode("append").format("kudu").save
+    val df2 = sqlContext.read.options(options2).format("kudu").load
+    df2.createOrReplaceTempView(table2)
+
+    // 2. Get the table statistics of each table and verify.
+    val relation1 = kuduRelationFromDataFrame(df1)
+    val relation2 = kuduRelationFromDataFrame(df2)
+    assert(relation1.sizeInBytes == relation2.sizeInBytes)
+    assert(relation1.sizeInBytes == 1024)
+
+    // 3. Test join with table size should be able to broadcast.
+    val sqlStr = s"SELECT * FROM $table1 JOIN $table2 ON $table1.key = $table2.key"
+    var physical = sqlContext.sql(sqlStr).queryExecution.sparkPlan
+    var operators = physical.collect {
+      case j: BroadcastHashJoinExec => j
+    }
+    assert(operators.size == 1)
+
+    // Verify result.
+    var results = sqlContext.sql(sqlStr).collectAsList()
+    assert(results.size() == rowCount)
+  }
 }
diff --git a/src/kudu/master/authz_provider.h b/src/kudu/master/authz_provider.h
index 51ba635..48023e5 100644
--- a/src/kudu/master/authz_provider.h
+++ b/src/kudu/master/authz_provider.h
@@ -95,6 +95,14 @@ class AuthzProvider {
                                      std::unordered_set<std::string>* table_names,
                                      bool* checked_table_names) WARN_UNUSED_RESULT = 0;
 
+  // Checks if statistics of the table is authorized for the
+  // given user.
+  //
+  // If the operation is not authorized, returns Status::NotAuthorized().
+  // Otherwise, may return other Status error codes depend on actual errors.
+  virtual Status AuthorizeGetTableStatistics(const std::string& table_name,
+                                             const std::string& user) WARN_UNUSED_RESULT
= 0;
+
   // Populates the privilege fields of 'pb' with the table-specific privileges
   // for the given user, using 'schema_pb' for metadata (e.g. column IDs). This
   // does not populate the table ID field of 'pb' -- only the privilege fields;
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 22792c2..d9cd3ab 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -268,6 +268,21 @@ DEFINE_int32(catalog_manager_inject_latency_list_authz_ms, 0,
 TAG_FLAG(catalog_manager_inject_latency_list_authz_ms, hidden);
 TAG_FLAG(catalog_manager_inject_latency_list_authz_ms, unsafe);
 
+DEFINE_bool(mock_table_metrics_for_testing, false,
+            "Whether to enable mock table metrics for testing.");
+TAG_FLAG(mock_table_metrics_for_testing, hidden);
+TAG_FLAG(mock_table_metrics_for_testing, runtime);
+
+DEFINE_int64(on_disk_size_for_testing, 0,
+             "Mock the on disk size of metrics for testing.");
+TAG_FLAG(on_disk_size_for_testing, hidden);
+TAG_FLAG(on_disk_size_for_testing, runtime);
+
+DEFINE_int64(live_row_count_for_testing, 0,
+             "Mock the live row count of metrics for testing.");
+TAG_FLAG(live_row_count_for_testing, hidden);
+TAG_FLAG(live_row_count_for_testing, runtime);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_bool(raft_attempt_to_replace_replica_without_majority);
 DECLARE_int64(tsk_rotation_seconds);
@@ -2894,6 +2909,32 @@ Status CatalogManager::ListTables(const ListTablesRequestPB* req,
   return Status::OK();
 }
 
+Status CatalogManager::GetTableStatistics(const GetTableStatisticsRequestPB* req,
+                                          GetTableStatisticsResponsePB* resp,
+                                          optional<const string&> user) {
+  leader_lock_.AssertAcquiredForReading();
+  RETURN_NOT_OK(CheckOnline());
+
+  scoped_refptr<TableInfo> table;
+  TableMetadataLock l;
+  auto authz_func = [&] (const string& username, const string& table_name) {
+      return SetupError(authz_provider_->AuthorizeGetTableStatistics(table_name, username),
+                        resp, MasterErrorPB::NOT_AUTHORIZED);
+  };
+  RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, authz_func, user,
+                                          &table, &l));
+
+  int64_t on_disk_size = table->GetMetrics()->on_disk_size->value();
+  int64_t live_row_count = table->GetMetrics()->live_row_count->value();
+  if (FLAGS_mock_table_metrics_for_testing) {
+    on_disk_size = FLAGS_on_disk_size_for_testing;
+    live_row_count = FLAGS_live_row_count_for_testing;
+  }
+  resp->set_on_disk_size(on_disk_size);
+  resp->set_live_row_count(live_row_count);
+  return Status::OK();
+}
+
 Status CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableInfo>
*table) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
@@ -5236,6 +5277,7 @@ INITTED_AND_LEADER_OR_RESPOND(IsCreateTableDoneResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(ListTablesResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(GetTableLocationsResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(GetTableSchemaResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(GetTableStatisticsResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(GetTabletLocationsResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(ReplaceTabletResponsePB);
 
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 3e89815..e7fc11c 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -633,6 +633,12 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                     ListTablesResponsePB* resp,
                     boost::optional<const std::string&> user);
 
+  // Get table statistics. If 'user' is provided, checks if the user is
+  // authorized to get such statistics.
+  Status GetTableStatistics(const GetTableStatisticsRequestPB* req,
+                            GetTableStatisticsResponsePB* resp,
+                            boost::optional<const std::string&> user);
+
   // Lookup the tablets contained in the partition range of the request. If 'user'
   // is provided, checks that the user is authorized to get such information.
   //
diff --git a/src/kudu/master/default_authz_provider.h b/src/kudu/master/default_authz_provider.h
index 9db2f5a..55b3311 100644
--- a/src/kudu/master/default_authz_provider.h
+++ b/src/kudu/master/default_authz_provider.h
@@ -73,6 +73,11 @@ class DefaultAuthzProvider : public AuthzProvider {
     return Status::OK();
   }
 
+  Status AuthorizeGetTableStatistics(const std::string& /*table_name*/,
+                                     const std::string& /*user*/) override WARN_UNUSED_RESULT
{
+    return Status::OK();
+  }
+
   Status FillTablePrivilegePB(const std::string& /*table_name*/,
                               const std::string& /*user*/,
                               const SchemaPB& /*schema_pb*/,
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 7cbbcb5..bc35210 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -105,10 +105,13 @@ using strings::Substitute;
 
 DECLARE_bool(catalog_manager_check_ts_count_for_create_table);
 DECLARE_bool(master_support_authz_tokens);
+DECLARE_bool(mock_table_metrics_for_testing);
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_double(sys_catalog_fail_during_write);
 DECLARE_int32(diagnostics_log_stack_traces_interval_ms);
 DECLARE_int32(master_inject_latency_on_tablet_lookups_ms);
+DECLARE_int64(live_row_count_for_testing);
+DECLARE_int64(on_disk_size_for_testing);
 DECLARE_string(location_mapping_cmd);
 
 namespace kudu {
@@ -1772,6 +1775,31 @@ TEST_F(MasterTest, TestTableIdentifierWithIdAndName) {
   }
 }
 
+TEST_F(MasterTest, TestGetTableStatistics) {
+  const char *kTableName = "testtable";
+  const Schema kTableSchema({ ColumnSchema("key", INT32) }, 1);
+  ASSERT_OK(CreateTable(kTableName, kTableSchema));
+
+  // Get table statistics with right name.
+  GetTableStatisticsRequestPB req;
+  GetTableStatisticsResponsePB resp;
+  RpcController controller;
+  req.mutable_table()->set_table_name(kTableName);
+  ASSERT_OK(proxy_->GetTableStatistics(req, &resp, &controller));
+  ASSERT_FALSE(resp.has_error()) << resp.error().DebugString();
+  ASSERT_EQ(0, resp.on_disk_size());
+  ASSERT_EQ(0, resp.live_row_count());
+
+  FLAGS_mock_table_metrics_for_testing = true;
+  FLAGS_on_disk_size_for_testing = 1024;
+  FLAGS_live_row_count_for_testing = 100;
+  controller.Reset();
+  ASSERT_OK(proxy_->GetTableStatistics(req, &resp, &controller));
+  ASSERT_FALSE(resp.has_error()) << resp.error().DebugString();
+  ASSERT_EQ(FLAGS_on_disk_size_for_testing, resp.on_disk_size());
+  ASSERT_EQ(FLAGS_live_row_count_for_testing, resp.live_row_count());
+}
+
 class AuthzTokenMasterTest : public MasterTest,
                              public ::testing::WithParamInterface<bool> {};
 
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index fc5fef7..73a656d 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -535,6 +535,19 @@ message ListTablesResponsePB {
   repeated TableInfo tables = 2;
 }
 
+message GetTableStatisticsRequestPB {
+  required TableIdentifierPB table = 1;
+}
+
+message GetTableStatisticsResponsePB {
+  // The error, if an error occurred with this request.
+  optional MasterErrorPB error = 1;
+
+  // The table statistics from table metrics.
+  optional uint64 on_disk_size = 2;
+  optional uint64 live_row_count = 3;
+}
+
 message GetTableLocationsRequestPB {
   required TableIdentifierPB table = 1;
 
@@ -919,6 +932,11 @@ service MasterService {
   rpc ListTables(ListTablesRequestPB) returns (ListTablesResponsePB) {
     option (kudu.rpc.authz_method) = "AuthorizeClient";
   }
+
+  rpc GetTableStatistics(GetTableStatisticsRequestPB) returns (GetTableStatisticsResponsePB)
{
+    option (kudu.rpc.authz_method) = "AuthorizeClient";
+  }
+
   rpc GetTableLocations(GetTableLocationsRequestPB) returns (GetTableLocationsResponsePB)
{
     option (kudu.rpc.authz_method) = "AuthorizeClient";
   }
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 9b9af6d..20fa876 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -411,6 +411,19 @@ void MasterServiceImpl::ListTables(const ListTablesRequestPB* req,
   rpc->RespondSuccess();
 }
 
+void MasterServiceImpl::GetTableStatistics(const GetTableStatisticsRequestPB* req,
+                                           GetTableStatisticsResponsePB* resp,
+                                           rpc::RpcContext* rpc) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
+    return;
+  }
+  Status s = server_->catalog_manager()->GetTableStatistics(
+      req, resp, make_optional<const string&>(rpc->remote_user().username()));
+  CheckRespErrorOrSetUnknown(s, resp);
+  rpc->RespondSuccess();
+}
+
 void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
                                           GetTableLocationsResponsePB* resp,
                                           rpc::RpcContext* rpc) {
diff --git a/src/kudu/master/master_service.h b/src/kudu/master/master_service.h
index 5d6846e..83dee04 100644
--- a/src/kudu/master/master_service.h
+++ b/src/kudu/master/master_service.h
@@ -50,6 +50,8 @@ class GetTableLocationsRequestPB;
 class GetTableLocationsResponsePB;
 class GetTableSchemaRequestPB;
 class GetTableSchemaResponsePB;
+class GetTableStatisticsRequestPB;
+class GetTableStatisticsResponsePB;
 class GetTabletLocationsRequestPB;
 class GetTabletLocationsResponsePB;
 class IsAlterTableDoneRequestPB;
@@ -132,6 +134,10 @@ class MasterServiceImpl : public MasterServiceIf {
                   ListTablesResponsePB* resp,
                   rpc::RpcContext* rpc) override;
 
+  void GetTableStatistics(const GetTableStatisticsRequestPB* req,
+                          GetTableStatisticsResponsePB* resp,
+                          rpc::RpcContext* rpc) override;
+
   void GetTableLocations(const GetTableLocationsRequestPB* req,
                          GetTableLocationsResponsePB* resp,
                          rpc::RpcContext* rpc) override;
diff --git a/src/kudu/master/sentry_authz_provider-test.cc b/src/kudu/master/sentry_authz_provider-test.cc
index a3a694c..7efe19d 100644
--- a/src/kudu/master/sentry_authz_provider-test.cc
+++ b/src/kudu/master/sentry_authz_provider-test.cc
@@ -812,6 +812,19 @@ TEST_F(SentryAuthzProviderTest, TestAuthorizeAlterTable) {
                                                         kTestUser));
 }
 
+TEST_F(SentryAuthzProviderTest, TestAuthorizeGetTableStatistics) {
+  // Don't authorize getting statistics of a table for a user without required
+  // privileges.
+  ASSERT_OK(CreateRoleAndAddToGroups());
+  Status s = sentry_authz_provider_->AuthorizeGetTableStatistics("db.table", kTestUser);
+  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+
+  // Authorize get table statistics on a user with proper privileges.
+  TSentryPrivilege privilege = GetDatabasePrivilege("db", "SELECT");
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
+  ASSERT_OK(sentry_authz_provider_->AuthorizeGetTableStatistics("db.table", kTestUser));
+}
+
 TEST_F(SentryAuthzProviderTest, TestAuthorizeGetTableMetadata) {
   // Don't authorize getting metadata on a table for a user without required
   // privileges.
diff --git a/src/kudu/master/sentry_authz_provider.cc b/src/kudu/master/sentry_authz_provider.cc
index 355bdae..b9705c0 100644
--- a/src/kudu/master/sentry_authz_provider.cc
+++ b/src/kudu/master/sentry_authz_provider.cc
@@ -253,6 +253,15 @@ Status SentryAuthzProvider::AuthorizeListTables(const string& user,
   return Status::OK();
 }
 
+Status SentryAuthzProvider::AuthorizeGetTableStatistics(const std::string& table_name,
+                                                        const std::string& user) {
+  // Statistics contain data (e.g. number of rows) that requires the 'SELECT ON TABLE'
+  // privilege.
+  return Authorize(SentryAuthorizableScope::Scope::TABLE,
+                   SentryAction::Action::SELECT,
+                   table_name, user);
+}
+
 Status SentryAuthzProvider::FillTablePrivilegePB(const string& table_name,
                                                  const string& user,
                                                  const SchemaPB& schema_pb,
diff --git a/src/kudu/master/sentry_authz_provider.h b/src/kudu/master/sentry_authz_provider.h
index 7f1496f..27c0fa1 100644
--- a/src/kudu/master/sentry_authz_provider.h
+++ b/src/kudu/master/sentry_authz_provider.h
@@ -96,6 +96,9 @@ class SentryAuthzProvider : public AuthzProvider {
                              std::unordered_set<std::string>* table_names,
                              bool* checked_table_names) override WARN_UNUSED_RESULT;
 
+  Status AuthorizeGetTableStatistics(const std::string& table_name,
+                                     const std::string& user) override WARN_UNUSED_RESULT;
+
   Status FillTablePrivilegePB(const std::string& table_name,
                               const std::string& user,
                               const SchemaPB& schema_pb,


Mime
View raw message