carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-2057] Support specifying path when creating pre-aggregate table
Date Fri, 19 Jan 2018 10:19:21 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master aac7af733 -> 9882a74c8


[CARBONDATA-2057] Support specifying path when creating pre-aggregate table

When creating datamap of pre-aggreagate table, user should be able to specify the persistence
location of it.
User can use path property:

This closes #1835


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9882a74c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9882a74c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9882a74c

Branch: refs/heads/master
Commit: 9882a74c8843085cc13756a03025f163829e194e
Parents: aac7af7
Author: Jacky Li <jacky.likun@qq.com>
Authored: Fri Jan 19 14:48:36 2018 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri Jan 19 15:49:01 2018 +0530

----------------------------------------------------------------------
 .../testsuite/datamap/TestDataMapCommand.scala  | 27 +++++++++++++
 .../datamap/CarbonDropDataMapCommand.scala      | 40 ++++++++------------
 .../CreatePreAggregateTableCommand.scala        |  5 ++-
 3 files changed, 47 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9882a74c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index d0a342b..f3458e2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.datamap
 
+import java.io.{File, FilenameFilter}
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -24,6 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
@@ -207,6 +210,30 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
       Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
   }
 
+  test("create pre-agg table with path") {
+    sql("drop table if exists main_preagg")
+    sql("drop table if exists main ")
+    val path = "./_pre-agg_test"
+    sql("create table main(year int,month int,name string,salary int) stored by 'carbondata'
tblproperties('sort_columns'='month,year,name')")
+    sql("insert into main select 10,11,'amy',12")
+    sql("insert into main select 10,11,'amy',14")
+    sql("create datamap preagg on table main " +
+        "using 'preaggregate' " +
+        s"dmproperties ('path'='$path') " +
+        "as select name,avg(salary) from main group by name")
+    assertResult(true)(new File(path).exists())
+    assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
+                         .list(new FilenameFilter {
+                           override def accept(dir: File, name: String): Boolean = {
+                             name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+                           }
+                         }).length > 0)
+    checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
+    checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
+    sql("drop datamap preagg on table main")
+    assertResult(false)(new File(path).exists())
+    sql("drop table main")
+  }
 
   override def afterAll {
     sql("DROP TABLE IF EXISTS maintable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9882a74c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 59aa322..e545b0b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -51,10 +51,11 @@ case class CarbonDropDataMapCommand(
     tableName: String)
   extends AtomicRunnableCommand {
 
+  var commandToRun: CarbonDropTableCommand = _
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
-    val identifier = TableIdentifier(tableName, Option(dbName))
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
@@ -68,18 +69,12 @@ case class CarbonDropDataMapCommand(
         lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
-      var carbonTable: Option[CarbonTable] =
-        catalog.getTableFromMetadataCache(dbName, tableName)
-      if (carbonTable.isEmpty) {
-        try {
-          carbonTable = Some(catalog.lookupRelation(identifier)(sparkSession)
-            .asInstanceOf[CarbonRelation].metaData.carbonTable)
-        } catch {
-          case ex: NoSuchTableException =>
-            if (!ifExistsSet) {
-              throw ex
-            }
-        }
+      val carbonTable: Option[CarbonTable] = try {
+        Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession))
+      } catch {
+        case ex: NoSuchTableException =>
+          if (!ifExistsSet) throw ex
+          else None
       }
       if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size()
> 0) {
         val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
@@ -104,12 +99,12 @@ case class CarbonDropDataMapCommand(
               tableName))(sparkSession)
           if (dataMapSchema.isDefined) {
             if (dataMapSchema.get._1.getRelationIdentifier != null) {
-              CarbonDropTableCommand(
+              commandToRun = CarbonDropTableCommand(
                 ifExistsSet = true,
                 Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName),
                 dataMapSchema.get._1.getRelationIdentifier.getTableName,
-                dropChildTable = true
-              ).processMetadata(sparkSession)
+                dropChildTable = true)
+              commandToRun.processMetadata(sparkSession)
             }
           }
           // fires the event after dropping datamap from main table schema
@@ -143,14 +138,11 @@ case class CarbonDropDataMapCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
-    val tableIdentifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
-    DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
-    CarbonDropTableCommand(
-      ifExistsSet = true,
-      databaseNameOp,
-      dataMapName,
-      dropChildTable = true
-    ).processData(sparkSession)
+    if (commandToRun != null) {
+      DataMapStoreManager.getInstance().clearDataMap(
+        commandToRun.carbonTable.getAbsoluteTableIdentifier, dataMapName)
+      commandToRun.processData(sparkSession)
+    }
     Seq.empty
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9882a74c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 8b11548..933bf91 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -105,8 +105,11 @@ case class CreatePreAggregateTableCommand(
     }
     tableModel.parentTable = Some(parentTable)
     tableModel.dataMapRelation = Some(fieldRelationMap)
-    val tablePath =
+    val tablePath = if (dmProperties.contains("path")) {
+      dmProperties("path")
+    } else {
       CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
+    }
     CarbonCreateTableCommand(TableNewProcessor(tableModel),
       tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession)
 


Mime
View raw message