carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [02/15] carbondata git commit: [CARBONDATA-1261] Load data sql add 'header' option
Date Tue, 25 Jul 2017 17:37:39 GMT
[CARBONDATA-1261] Load data sql add 'header' option

This closes #1133


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0934e44c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0934e44c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0934e44c

Branch: refs/heads/master
Commit: 0934e44c7f40413c670758ce80eb663ad59617da
Parents: 9e064ee
Author: QiangCai <david.caiq@gmail.com>
Authored: Tue Jul 4 12:11:33 2017 +0800
Committer: Raghunandan S <carbondatacontributions@gmail.com>
Committed: Tue Jul 25 17:28:04 2017 +0800

----------------------------------------------------------------------
 .../TestLoadDataWithFileHeaderException.scala   | 101 ++++++++++++++++++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +-
 .../execution/command/carbonTableSchema.scala   |  31 +++++-
 .../execution/command/carbonTableSchema.scala   |  32 +++++-
 4 files changed, 162 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0934e44c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
index 04ccc87..3d3fb0c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
@@ -43,7 +43,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
     }
   }
 
-  test("test load data ddl provided  wrong file header exception") {
+  test("test load data ddl provided wrong file header exception") {
     try {
       sql(s"""
            LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
@@ -56,6 +56,105 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
     }
   }
 
+  test("test load data with wrong header , but without fileheader") {
+    try {
+      sql(s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+           options('header'='abc')
+           """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("'header' option should be either 'true' or 'false'"))
+    }
+  }
+
+  test("test load data with wrong header and fileheader") {
+    try {
+      sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
+         options('header'='', 'fileheader'='ID,date,country,name,phonetype,serialname,salary')
+         """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("'header' option should be either 'true' or 'false'"))
+    }
+  }
+
+  test("test load data with header=false, but without fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
+         options('header'='False')
+         """)
+  }
+
+  test("test load data with header=false and fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
+         options('header'='false', 'fileheader'='ID,date,country,name,phonetype,serialname,salary')
+         """)
+  }
+
+  test("test load data with header=false and wrong fileheader") {
+    try {
+      sql(s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
+        options('header'='false', 'fileheader'='ID1,date2,country,name,phonetype,serialname,salary')
+        """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("CSV header in DDL is not proper. Column names in schema
and CSV header are not the same"))
+    }
+  }
+
+  test("test load data with header=true, but without fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+         options('header'='True')
+         """)
+  }
+
+  test("test load data with header=true and fileheader") {
+    try {
+      sql(s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+           options('header'='true', 'fileheader'='ID,date,country,name,phonetype,serialname,salary')
+           """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("When 'header' option is true, 'fileheader' option is
not required."))
+    }
+  }
+
+  test("test load data with header=true and wrong fileheader") {
+    try {
+      sql(s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+           options('header'='true', 'fileheader'='ID1,date1,country,name,phonetype,serialname,salary')
+           """)
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("When 'header' option is true, 'fileheader' option is
not required."))
+    }
+  }
+
+  test("test load data without header and fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
+         """)
+  }
+
+  test("test load data without header, but with fileheader") {
+    sql(s"""
+         LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
+         options('fileheader'='ID,date,country,name,phonetype,serialname,salary')
+         """)
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS t3")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0934e44c/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 474af08..3704f8a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -840,7 +840,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH",
       "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB",
-      "GLOBAL_SORT_PARTITIONS"
+      "GLOBAL_SORT_PARTITIONS", "HEADER"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0934e44c/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 70c8407..44d5efb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -419,7 +419,7 @@ case class LoadTable(
 
       val delimiter = options.getOrElse("delimiter", ",")
       val quoteChar = options.getOrElse("quotechar", "\"")
-      val fileHeader = options.getOrElse("fileheader", "")
+      var fileHeader = options.getOrElse("fileheader", "")
       val escapeChar = options.getOrElse("escapechar", "\\")
       val commentchar = options.getOrElse("commentchar", "#")
       val columnDict = options.getOrElse("columndict", null)
@@ -441,6 +441,35 @@ case class LoadTable(
       val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
       val globalSortPartitions = options.getOrElse("global_sort_partitions", null)
       ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
+
+      // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+      // we should use table schema to generate file header.
+      val headerOption = options.get("header")
+      if (headerOption.isDefined) {
+        // whether the csv file has file header
+        // the default value is true
+        val header = try {
+          headerOption.get.toBoolean
+        } catch {
+          case ex: IllegalArgumentException =>
+            throw new MalformedCarbonCommandException(
+              "'header' option should be either 'true' or 'false'. " + ex.getMessage)
+        }
+        header match {
+          case true =>
+            if (fileHeader.nonEmpty) {
+              throw new MalformedCarbonCommandException(
+                "When 'header' option is true, 'fileheader' option is not required.")
+            }
+          case false =>
+            // generate file header
+            if (fileHeader.isEmpty) {
+              fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+                .asScala.map(_.getColName).mkString(",")
+            }
+        }
+      }
+
       val bad_record_path = options.getOrElse("bad_record_path",
           CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
             CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0934e44c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ee0b8a6..0391cb3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -537,6 +537,36 @@ case class LoadTable(
       carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar").get,
"\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar").get, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar").get,
"#"))
+
+      // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+      // we should use table schema to generate file header.
+      var fileHeader = optionsFinal.get("fileheader").get
+      val headerOption = options.get("header")
+      if (headerOption.isDefined) {
+        // whether the csv file has file header
+        // the default value is true
+        val header = try {
+          headerOption.get.toBoolean
+        } catch {
+          case ex: IllegalArgumentException =>
+            throw new MalformedCarbonCommandException(
+              "'header' option should be either 'true' or 'false'. " + ex.getMessage)
+        }
+        header match {
+          case true =>
+            if (fileHeader.nonEmpty) {
+              throw new MalformedCarbonCommandException(
+                "When 'header' option is true, 'fileheader' option is not required.")
+            }
+          case false =>
+            // generate file header
+            if (fileHeader.isEmpty) {
+              fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+                .asScala.map(_.getColName).mkString(",")
+            }
+        }
+      }
+
       carbonLoadModel.setDateFormat(dateFormat)
       carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -582,7 +612,7 @@ case class LoadTable(
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         carbonLoadModel.setFactFilePath(factPath)
         carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
-        carbonLoadModel.setCsvHeader(optionsFinal.get("fileheader").get)
+        carbonLoadModel.setCsvHeader(fileHeader)
         carbonLoadModel.setColDictFilePath(column_dict)
         carbonLoadModel.setDirectLoad(true)
         carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))


Mime
View raw message