carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [47/50] [abbrv] incubator-carbondata git commit: Added options to include and exclude dictionary columns in dataframe
Date Tue, 02 May 2017 14:13:11 GMT
Added options to include and exclude dictionary columns in dataframe


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1814048e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1814048e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1814048e

Branch: refs/heads/12-dev
Commit: 1814048ee9edbe64f483a3212258a39efb9a58b0
Parents: 0287e1e
Author: Sanoj MG <sanoj.george.dev@gmail.com>
Authored: Tue Apr 18 17:22:14 2017 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Tue May 2 09:43:27 2017 +0800

----------------------------------------------------------------------
 .../testsuite/dataload/TestLoadDataFrame.scala  | 48 +++++++++++++++++++-
 .../spark/CarbonDataFrameWriter.scala           |  5 ++
 .../spark/sql/CarbonDataFrameWriter.scala       | 29 ++----------
 3 files changed, 57 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1814048e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 3b0fd4a..6e79a10 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterAll
 class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
   var df: DataFrame = _
   var dataFrame: DataFrame = _
+  var df2: DataFrame = _
 
 
   def buildTestData() = {
@@ -45,6 +46,9 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
       StructField("string", StringType, nullable = false) :: Nil)
 
     dataFrame = sqlContext.createDataFrame(rdd, schema)
+    df2 = sqlContext.sparkContext.parallelize(1 to 1000)
+      .map(x => ("key_" + x, "str_" + x, x, x * 2, x * 3))
+      .toDF("c1", "c2", "c3", "c4", "c5")
   }
 
   def dropTable() = {
@@ -52,7 +56,9 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon2")
     sql("DROP TABLE IF EXISTS carbon3")
     sql("DROP TABLE IF EXISTS carbon4")
-
+    sql("DROP TABLE IF EXISTS carbon5")
+    sql("DROP TABLE IF EXISTS carbon6")
+    sql("DROP TABLE IF EXISTS carbon7")
   }
 
 
@@ -114,6 +120,46 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
       sql("SELECT decimal FROM carbon4"),Seq(Row(BigDecimal.valueOf(10000.00)),Row(BigDecimal.valueOf(1234.44))))
   }
 
+  test("test load dataframe with integer columns included in the dictionary"){
+    df2.write
+      .format("carbondata")
+      .option("tableName", "carbon5")
+      .option("compress", "true")
+      .option("dictionary_include","c3,c4")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("select count(*) from carbon5 where c3 > 300"), Row(700)
+    )
+  }
+
+  test("test load dataframe with string column excluded from the dictionary"){
+    df2.write
+      .format("carbondata")
+      .option("tableName", "carbon6")
+      .option("compress", "true")
+      .option("dictionary_exclude","c2")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("select count(*) from carbon6 where c3 > 300"), Row(700)
+    )
+  }
+
+  test("test load dataframe with both dictionary include and exclude specified"){
+    df2.write
+      .format("carbondata")
+      .option("tableName", "carbon7")
+      .option("compress", "true")
+      .option("dictionary_include","c3,c4")
+      .option("dictionary_exclude","c2")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("select count(*) from carbon7 where c3 > 300"), Row(700)
+    )
+  }
+
   override def afterAll {
     dropTable
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1814048e/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index 0e2e4dd..9f813a8 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -175,6 +175,10 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
   }
 
   private def makeCreateTableString(schema: StructType, options: CarbonOption): String =
{
+    val properties = Map(
+      "DICTIONARY_INCLUDE" -> options.dictionaryInclude,
+      "DICTIONARY_EXCLUDE" -> options.dictionaryExclude
+    ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
     val carbonSchema = schema.map { field =>
       s"${ field.name } ${ convertToCarbonType(field.dataType) }"
     }
@@ -182,6 +186,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
           CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
           (${ carbonSchema.mkString(", ") })
           STORED BY '${ CarbonContext.datasourceName }'
+          ${ if (properties.nonEmpty) " TBLPROPERTIES (" + properties + ")" else ""}
       """
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1814048e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 465fad0..7cd5af6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -162,30 +162,11 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame)
{
     val carbonSchema = schema.map { field =>
       s"${ field.name } ${ convertToCarbonType(field.dataType) }"
     }
-    var property = new StringBuilder
-    property.append(
-      if (options.dictionaryInclude.isDefined) {
-        s"'DICTIONARY_INCLUDE' = '${options.dictionaryInclude.get}' ,"
-      } else {
-        ""
-      }
-    ).append(
-      if (options.dictionaryExclude.isDefined) {
-        s"'DICTIONARY_EXCLUDE' = '${options.dictionaryExclude.get}' ,"
-      } else {
-        ""
-      }
-    ).append(
-      if (options.tableBlockSize.isDefined) {
-        s"'table_blocksize' = '${options.tableBlockSize.get}'"
-      } else {
-        ""
-      }
-    )
-    if (property.nonEmpty && property.charAt(property.length-1) == ',') {
-      property = property.replace(property.length - 1, property.length, "")
-    }
-
+    val property = Map(
+      "DICTIONARY_INCLUDE" -> options.dictionaryInclude,
+      "DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
+      "TABLE_BLOCKSIZE" -> options.tableBlockSize
+    ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
     s"""
        | CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
        | (${ carbonSchema.mkString(", ") })


Mime
View raw message