carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [09/50] [abbrv] carbondata git commit: [CARBONDATA-2001] Unable to Save DataFrame As Carbondata stream table
Date Wed, 31 Jan 2018 05:22:29 GMT
[CARBONDATA-2001] Unable to Save DataFrame As Carbondata stream table

1.added table property for streaming in carbondataframewriter
2.added test case for same

This closes #1774


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bef6af30
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bef6af30
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bef6af30

Branch: refs/heads/carbonstore
Commit: bef6af30e5959a6edbaf95cd004da90ad1e6d646
Parents: bcc9cf0
Author: anubhav100 <anubhav.tarar@knoldus.in>
Authored: Mon Jan 8 13:17:36 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Fri Jan 19 21:27:58 2018 +0800

----------------------------------------------------------------------
 .../testsuite/dataload/TestLoadDataFrame.scala    | 18 +++++++++++++++++-
 .../apache/spark/sql/CarbonDataFrameWriter.scala  | 12 +++++++++---
 2 files changed, 26 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bef6af30/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 574eb91..6f03493 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -73,6 +73,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon8")
     sql("DROP TABLE IF EXISTS carbon9")
     sql("DROP TABLE IF EXISTS carbon10")
+    sql("DROP TABLE IF EXISTS carbon11")
     sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified")
     sql("DROP TABLE IF EXISTS df_write_specify_sort_column")
     sql("DROP TABLE IF EXISTS df_write_empty_sort_column")
@@ -244,7 +245,22 @@ test("test the boolean data type"){
       .message
       .contains("not found"))
   }
-
+  test("test streaming Table") {
+    dataFrame.write
+      .format("carbondata")
+      .option("tableName", "carbon11")
+      .option("tempCSV", "true")
+      .option("single_pass", "false")
+      .option("compress", "false")
+      .option("streaming", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("SELECT decimal FROM carbon11"),Seq(Row(BigDecimal.valueOf(10000.00)),Row(BigDecimal.valueOf(1234.44))))
+    val descResult =sql("desc formatted carbon11")
+    val isStreaming: String = descResult.collect().find(row=>row(0).asInstanceOf[String].trim.equalsIgnoreCase("streaming")).get.get(1).asInstanceOf[String]
+    assert(isStreaming.contains("true"))
+  }
   private def getSortColumnValue(tableName: String): Array[String] = {
     val desc = sql(s"desc formatted $tableName")
     val sortColumnRow = desc.collect.find(r =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bef6af30/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index d50f0b8..2b06375 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -167,19 +167,25 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame)
{
     val carbonSchema = schema.map { field =>
       s"${ field.name } ${ convertToCarbonType(field.dataType) }"
     }
+
     val property = Map(
       "SORT_COLUMNS" -> options.sortColumns,
       "DICTIONARY_INCLUDE" -> options.dictionaryInclude,
       "DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
-      "TABLE_BLOCKSIZE" -> options.tableBlockSize
-    ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
+      "TABLE_BLOCKSIZE" -> options.tableBlockSize,
+      "STREAMING" -> Option(options.isStreaming.toString)
+    ).filter(_._2.isDefined)
+      .map(property => s"'${property._1}' = '${property._2.get}'").mkString(",")
+
     val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
+
     s"""
        | CREATE TABLE IF NOT EXISTS $dbName.${options.tableName}
        | (${ carbonSchema.mkString(", ") })
        | STORED BY 'carbondata'
-       | ${ if (property.nonEmpty) "TBLPROPERTIES (" + property + ")" else "" }
        | ${ if (options.tablePath.nonEmpty) s"LOCATION '${options.tablePath.get}'" else ""}
+       |  ${ if (property.nonEmpty) "TBLPROPERTIES (" + property + ")" else "" }
+       |
      """.stripMargin
   }
 


Mime
View raw message