carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1812] Provide API to get table dynamic information(table size and last modified time)
Date Mon, 27 Nov 2017 11:37:02 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 6d9cec91b -> e19ada792


[CARBONDATA-1812] Provide API to get table dynamic information(table size and last modified
time)

provide API to get table dynamic information(table size and last modified time)

This closes #1572


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e19ada79
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e19ada79
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e19ada79

Branch: refs/heads/master
Commit: e19ada792dbdc8b1083c423b0cee66fb8d5a91a4
Parents: 6d9cec9
Author: QiangCai <qiangcai@qq.com>
Authored: Mon Nov 27 10:01:44 2017 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Mon Nov 27 19:36:46 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java | 15 +++++
 .../schema/CarbonGetTableDetailCommand.scala    | 65 ++++++++++++++++++++
 .../TestStreamingTableOperation.scala           |  2 +-
 .../CarbonGetTableDetailComandTestCase.scala    | 54 ++++++++++++++++
 4 files changed, 135 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e19ada79/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index ac580cd..97a9445 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.metadata.schema.table;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
 
@@ -31,6 +32,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Mapping class for Carbon actual table
@@ -715,4 +717,17 @@ public class CarbonTable implements Serializable {
     return null != tableInfo.getParentRelationIdentifiers()
         && !tableInfo.getParentRelationIdentifiers().isEmpty();
   }
+
+  public long size() throws IOException {
+    Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this);
+    Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
+    if (dataSize == null) {
+      dataSize = 0L;
+    }
+    Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE);
+    if (indexSize == null) {
+      indexSize = 0L;
+    }
+    return dataSize + indexSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e19ada79/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
new file mode 100644
index 0000000..49831ef
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.{DataProcessCommand, RunnableCommand}
+import org.apache.spark.sql.types.{LongType, StringType}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+
+/**
+ * collect dynamic detail information of the table, including table size, last modified time,
etc.
+ */
+case class CarbonGetTableDetailCommand(
+    databaseName: String,
+    tableNames: Seq[String])
+  extends RunnableCommand with DataProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val storePath = CarbonProperties.getStorePath
+    tableNames.map { tablename =>
+      val absoluteTableIdentifier =
+        AbsoluteTableIdentifier.from(storePath, databaseName.toLowerCase, tablename.toLowerCase)
+      val carbonTableIdentifier = absoluteTableIdentifier.getCarbonTableIdentifier
+      val carbonTable = CarbonEnv.getCarbonTable(Option(carbonTableIdentifier.getDatabaseName),
+        carbonTableIdentifier.getTableName)(sparkSession)
+
+      Row(
+        tablename,
+        carbonTable.size,
+        SegmentStatusManager.getTableStatusLastModifiedTime(absoluteTableIdentifier)
+      )
+    }
+  }
+
+  override def output: Seq[Attribute] = {
+    Seq(AttributeReference("table name", StringType, nullable = false)(),
+      AttributeReference("table size", LongType, nullable = false)(),
+      AttributeReference("last modified time", LongType, nullable = false)())
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e19ada79/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 3b9b2c3..add4fca 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -342,7 +342,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       rowNumsEachBatch = 100,
       intervalOfSource = 3,
       intervalOfIngest = 5,
-      continueSeconds = 20,
+      continueSeconds = 30,
       generateBadRecords = false,
       badRecordAction = "force"
     )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e19ada79/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
new file mode 100644
index 0000000..ca9e111
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.command.schema.CarbonGetTableDetailCommand
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll(): Unit = {
+    sql("drop table if exists table_info1")
+    sql("create table table_info1 (empno int, workgroupcategory string, deptno int, projectcode
int, attendance int) stored by 'org.apache.carbondata.format'")
+    sql(s"""load data local inpath '$resourcesPath/data.csv' into table table_info1 options('delimiter'=',',
'quotechar'='\"', 'fileheader'='')""")
+
+    sql("drop table if exists table_info2")
+    sql("create table table_info2 (empno int, workgroupcategory string, deptno int, projectcode
int, attendance int) stored by 'org.apache.carbondata.format'")
+    sql(s"""load data local inpath '$resourcesPath/data.csv' into table table_info2 options('delimiter'=',',
'quotechar'='\"', 'fileheader'='')""")
+  }
+
+  test("collect the information of tables") {
+    val logicalPlan = CarbonGetTableDetailCommand("default", Seq("table_info1", "table_info2"))
+    val result =new QueryExecution(sqlContext.sparkSession, logicalPlan)
+      .executedPlan
+      .execute
+      .collect
+
+    assertResult(2)(result.length)
+    assertResult("table_info1")(result(0).getString(0))
+    assertResult(2136)(result(0).getLong(1))
+    assertResult("table_info2")(result(1).getString(0))
+    assertResult(2136)(result(1).getLong(1))
+  }
+
+  override def afterAll: Unit = {
+    sql("drop table if exists table_info1")
+    sql("drop table if exists table_info2")
+  }
+}


Mime
View raw message