carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: add useKettle option for loading
Date Wed, 26 Oct 2016 03:13:28 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master fdc9454d9 -> 91146fd26


add useKettle option for loading


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/c72b1fb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/c72b1fb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/c72b1fb3

Branch: refs/heads/master
Commit: c72b1fb3d7cb5e0dec5f2a6bdbadb8c84252632a
Parents: fdc9454
Author: jackylk <jacky.likun@huawei.com>
Authored: Tue Oct 25 14:49:25 2016 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Wed Oct 26 08:40:56 2016 +0530

----------------------------------------------------------------------
 .../spark/CarbonDataFrameWriter.scala           |  3 +-
 .../apache/carbondata/spark/CarbonOption.scala  |  1 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 19 +++----
 .../execution/command/carbonTableSchema.scala   | 57 ++++++++++++++------
 .../spark/sql/hive/CarbonStrategies.scala       |  8 +--
 6 files changed, 60 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c72b1fb3/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index 65ff787..1939095 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -123,7 +123,8 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging
{
       Map(("fileheader" -> header)),
       false,
       null,
-      Some(dataFrame)).run(cc)
+      Some(dataFrame),
+      options.useKettle).run(cc)
   }
 
   private def csvPackage: String = "com.databricks.spark.csv.newapi"

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c72b1fb3/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 71b18b9..e8bc97e 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -42,4 +42,5 @@ class CarbonOption(options: Map[String, String]) {
 
   def compress: Boolean = options.getOrElse("compress", "false").toBoolean
 
+  def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c72b1fb3/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 511f452..054dd90 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -667,7 +667,7 @@ object CarbonDataRDDFactory extends Logging {
       columinar: Boolean,
       isAgg: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
-      dataFrame: Option[DataFrame] = None) {
+      dataFrame: Option[DataFrame] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
     // for handling of the segment Merging.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c72b1fb3/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 22019c8..da14b3e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -925,26 +925,27 @@ class CarbonSqlParser()
     LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
       (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
       (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
-        case filePath ~ isOverwrite ~ table ~ partionDataOptions =>
+        case filePath ~ isOverwrite ~ table ~ optionsList =>
           val (databaseNameOp, tableName) = table match {
             case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
           }
-          if(partionDataOptions.isDefined) {
-            validateOptions(partionDataOptions)
+          if(optionsList.isDefined) {
+            validateOptions(optionsList)
           }
-          val patitionOptionsMap = partionDataOptions.getOrElse(List.empty[(String, String)]).toMap
-          LoadTable(databaseNameOp, tableName, filePath, Seq(), patitionOptionsMap,
-            isOverwrite.isDefined)
+          val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+          val useKettle = optionsMap.getOrElse("USE_KETTLE", "true").toBoolean
+          LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
+            isOverwrite.isDefined, useKettle = useKettle)
       }
 
-  private def validateOptions(partionDataOptions: Option[List[(String, String)]]): Unit =
{
+  private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
 
     // validate with all supported options
-    val options = partionDataOptions.get.groupBy(x => x._1)
+    val options = optionList.get.groupBy(x => x._1)
     val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
-      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR"
+      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c72b1fb3/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 919f310..19e23f7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1024,7 +1024,34 @@ case class LoadTable(
     tableName: String,
     factPathFromUser: String,
     dimFilesPath: Seq[DataLoadTableFileMapping],
-    partionValues: scala.collection.immutable.Map[String, String],
+    options: scala.collection.immutable.Map[String, String],
+    isOverwriteExist: Boolean = false,
+    var inputSqlString: String = null,
+    dataFrame: Option[DataFrame] = None,
+    useKettle: Boolean = true) extends RunnableCommand {
+
+  def run(sqlContext: SQLContext): Seq[Row] = {
+    if (useKettle) {
+      LoadTableUsingKettle(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
+        options, isOverwriteExist, inputSqlString, dataFrame).run(sqlContext)
+    } else {
+      LoadTableUsingProcessorStep().run(sqlContext)
+    }
+  }
+}
+
+case class LoadTableUsingProcessorStep() extends RunnableCommand {
+  def run(sqlContext: SQLContext): Seq[Row] = {
+    throw new UnsupportedOperationException("work in progress")
+  }
+}
+
+case class LoadTableUsingKettle(
+    databaseNameOp: Option[String],
+    tableName: String,
+    factPathFromUser: String,
+    dimFilesPath: Seq[DataLoadTableFileMapping],
+    options: scala.collection.immutable.Map[String, String],
     isOverwriteExist: Boolean = false,
     var inputSqlString: String = null,
     dataFrame: Option[DataFrame] = None) extends RunnableCommand {
@@ -1104,19 +1131,19 @@ case class LoadTable(
       val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
       val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
 
-      val delimiter = partionValues.getOrElse("delimiter", ",")
-      val quoteChar = partionValues.getOrElse("quotechar", "\"")
-      val fileHeader = partionValues.getOrElse("fileheader", "")
-      val escapeChar = partionValues.getOrElse("escapechar", "\\")
-      val commentchar = partionValues.getOrElse("commentchar", "#")
-      val columnDict = partionValues.getOrElse("columndict", null)
-      val serializationNullFormat = partionValues.getOrElse("serialization_null_format",
"\\N")
-      val badRecordsLoggerEnable = partionValues.getOrElse("bad_records_logger_enable", "false")
-      val badRecordsLoggerRedirect = partionValues.getOrElse("bad_records_action", "force")
-      val allDictionaryPath = partionValues.getOrElse("all_dictionary_path", "")
-      val complex_delimiter_level_1 = partionValues.getOrElse("complex_delimiter_level_1",
"\\$")
-      val complex_delimiter_level_2 = partionValues.getOrElse("complex_delimiter_level_2",
"\\:")
-      val multiLine = partionValues.getOrElse("multiline", "false").trim.toLowerCase match
{
+      val delimiter = options.getOrElse("delimiter", ",")
+      val quoteChar = options.getOrElse("quotechar", "\"")
+      val fileHeader = options.getOrElse("fileheader", "")
+      val escapeChar = options.getOrElse("escapechar", "\\")
+      val commentchar = options.getOrElse("commentchar", "#")
+      val columnDict = options.getOrElse("columndict", null)
+      val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
+      val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
+      val badRecordsLoggerRedirect = options.getOrElse("bad_records_action", "force")
+      val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
+      val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
+      val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
+      val multiLine = options.getOrElse("multiline", "false").trim.toLowerCase match {
         case "true" => true
         case "false" => false
         case illegal =>
@@ -1124,7 +1151,7 @@ case class LoadTable(
             "load DDL which you set can only be 'true' or 'false', please check your input
DDL."
           throw new MalformedCarbonCommandException(errorMessage)
       }
-      val maxColumns = partionValues.getOrElse("maxcolumns", null)
+      val maxColumns = options.getOrElse("maxcolumns", null)
       carbonLoadModel.setMaxColumns(maxColumns)
       carbonLoadModel.setEscapeChar(escapeChar)
       carbonLoadModel.setQuoteChar(quoteChar)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c72b1fb3/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 1a31229..0b4d020 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -227,12 +227,12 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan]
{
       case ShowLoadsCommand(databaseName, table, limit) =>
         ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
       case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
-      partionValues, isOverwriteExist, inputSqlString, _) =>
+      options, isOverwriteExist, inputSqlString, dataFrame, useKettle) =>
         val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
             .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
-        if (isCarbonTable || partionValues.nonEmpty) {
-          ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser,
-            dimFilesPath, partionValues, isOverwriteExist, inputSqlString)) :: Nil
+        if (isCarbonTable || options.nonEmpty) {
+          ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
+            options, isOverwriteExist, inputSqlString, dataFrame, useKettle)) :: Nil
         } else {
           ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil
         }


Mime
View raw message