carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: support loading RDD
Date Sat, 08 Oct 2016 15:38:49 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master e8ce3637e -> 6be540b7b


support loading RDD

add test case and fix comments

fix comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0018756d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0018756d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0018756d

Branch: refs/heads/master
Commit: 0018756de083417972adc3c34e13f89d31da61f9
Parents: e8ce363
Author: c00318382 <c00318382@huaweiobz.com>
Authored: Tue Sep 27 20:44:04 2016 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sat Oct 8 23:36:36 2016 +0800

----------------------------------------------------------------------
 .../examples/DataFrameAPIExample.scala          |   2 -
 .../carbondata/spark/load/CarbonLoadModel.java  |  14 +
 .../carbondata/spark/load/CarbonLoaderUtil.java |   2 +
 .../apache/carbondata/spark/CarbonOption.scala  |   2 +
 .../spark/implicit/DataFrameFuncs.scala         |  48 ++-
 .../spark/rdd/CarbonDataLoadRDD.scala           | 373 +++++++++++++------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  62 ++-
 .../spark/util/GlobalDictionaryUtil.scala       |  11 +-
 .../execution/command/carbonTableSchema.scala   |  15 +-
 .../spark/sql/hive/CarbonStrategies.scala       |   2 +-
 .../testsuite/dataload/TestLoadDataFrame.scala  |  95 +++++
 .../api/dataloader/DataLoadModel.java           |  10 +
 .../processing/csvload/DataGraphExecuter.java   |   4 +-
 .../processing/csvreaderstep/CsvInput.java      |  77 ++--
 .../processing/csvreaderstep/CsvInputMeta.java  |  17 +
 .../processing/csvreaderstep/RddInputUtils.java |  43 +++
 .../dataprocessor/DataProcessTaskStatus.java    |  10 +
 .../dataprocessor/IDataProcessStatus.java       |   2 +
 .../graphgenerator/GraphGenerator.java          |   4 +
 19 files changed, 623 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index 88bab3c..2d9193f 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.examples
 
-import org.apache.spark.sql.SaveMode
-
 import org.apache.carbondata.examples.util.ExampleUtils
 
 // scalastyle:off println

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
index 5b7f02d..3fd481b 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
@@ -127,6 +127,11 @@ public class CarbonLoadModel implements Serializable {
   private String maxColumns;
 
   /**
+   * the key of RDD Iterator in RDD iterator Map
+   */
+  private String rddIteratorKey;
+
+  /**
    * get escape char
    * @return
    */
@@ -621,4 +626,13 @@ public class CarbonLoadModel implements Serializable {
   public void setBadRecordsLoggerRedirect(String badRecordsLoggerRedirect) {
     this.badRecordsLoggerRedirect = badRecordsLoggerRedirect;
   }
+
+  public String getRddIteratorKey() {
+    return rddIteratorKey;
+  }
+
+  public void setRddIteratorKey(String rddIteratorKey) {
+    this.rddIteratorKey = rddIteratorKey;
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 9620ebc..18c9538 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -131,6 +131,7 @@ public final class CarbonLoaderUtil {
     model.setEscapeCharacter(schmaModel.getEscapeCharacter());
     model.setQuoteCharacter(schmaModel.getQuoteCharacter());
     model.setCommentCharacter(schmaModel.getCommentCharacter());
+    model.setRddIteratorKey(schmaModel.getRddIteratorKey());
     model.setTaskNo(loadModel.getTaskNo());
     model.setFactTimeStamp(loadModel.getFactTimeStamp());
     model.setMaxColumns(loadModel.getMaxColumns());
@@ -187,6 +188,7 @@ public final class CarbonLoaderUtil {
     schmaModel.setEscapeCharacter(loadModel.getEscapeChar());
     schmaModel.setQuoteCharacter(loadModel.getQuoteChar());
     schmaModel.setCommentCharacter(loadModel.getCommentChar());
+    schmaModel.setRddIteratorKey(loadModel.getRddIteratorKey());
     SchemaInfo info = new SchemaInfo();
 
     info.setDatabaseName(databaseName);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 9115d14..d4aa3f4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -36,6 +36,8 @@ class CarbonOption(options: Map[String, String]) {
       "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
   }
 
+  def tempCSV: String = options.getOrElse("tempCSV", "true")
+
   def compress: String = options.getOrElse("compress", "false")
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
index cf44364..8585e07 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
@@ -19,7 +19,8 @@ package org.apache.carbondata.spark
 
 import org.apache.hadoop.fs.Path
 import org.apache.spark.Logging
-import org.apache.spark.sql.{CarbonContext, DataFrame, DataFrameWriter, SaveMode}
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.command.LoadTable
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
@@ -35,16 +36,31 @@ class DataFrameFuncs(dataFrame: DataFrame) extends Logging {
       "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
     )
 
+    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
     val options = new CarbonOption(parameters)
-    val tableName = options.tableName
+    cc.sql(makeCreateTableString(dataFrame.schema, options))
+
+    if (options.tempCSV.equals("true")) {
+      loadTempCSV(options, cc)
+    } else {
+      loadDataFrame(options, cc)
+    }
+  }
 
+  /**
+   * Firstly, saving DataFrame to CSV files
+   * Secondly, load CSV files
+   * @param options
+   * @param cc
+   */
+  private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
     val tempCSVFolder = s"./tempCSV"
     var writer: DataFrameWriter =
       dataFrame.write
-          .format(csvPackage)
-          .option("header", "false")
-          .mode(SaveMode.Overwrite)
+        .format(csvPackage)
+        .option("header", "false")
+        .mode(SaveMode.Overwrite)
 
     if (options.compress.equals("true")) {
       writer = writer.option("codec", "gzip")
@@ -52,7 +68,6 @@ class DataFrameFuncs(dataFrame: DataFrame) extends Logging {
 
     writer.save(tempCSVFolder)
 
-    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
     val tempCSVPath = new Path(tempCSVFolder)
     val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
 
@@ -69,14 +84,31 @@ class DataFrameFuncs(dataFrame: DataFrame) extends Logging {
     }
 
     try {
-      cc.sql(makeCreateTableString(dataFrame.schema, options))
       logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB")
-      cc.sql(makeLoadString(tableName, tempCSVFolder))
+      cc.sql(makeLoadString(options.tableName, tempCSVFolder))
     } finally {
       fs.delete(tempCSVPath, true)
     }
   }
 
+  /**
+   * Loading DataFrame directly without saving DataFrame to CSV files.
+   * @param options
+   * @param cc
+   */
+  private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = {
+    val header = dataFrame.columns.mkString(",")
+    LoadTable(
+      Some(options.dbName),
+      options.tableName,
+      null,
+      Seq(),
+      Map(("fileheader" -> header)),
+      false,
+      null,
+      Some(dataFrame)).run(cc)
+  }
+
   private def csvPackage: String = "com.databricks.spark.csv.newapi"
 
   private def convertToCarbonType(sparkType: DataType): String = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 86d58a5..3f411bc 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -19,6 +19,7 @@
 package org.apache.carbondata.spark.rdd
 
 import java.lang.Long
+import java.text.SimpleDateFormat
 import java.util
 import java.util.UUID
 
@@ -26,8 +27,10 @@ import scala.collection.JavaConverters._
 import scala.util.Random
 
 import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.sql.Row
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
@@ -35,6 +38,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.processing.constants.DataProcessorConstants
+import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator
 import org.apache.carbondata.spark.DataLoadResult
@@ -72,8 +76,91 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
   override def hashCode(): Int = 41 * (41 + rddId) + idx
 }
 
+class SparkPartitionLoader(model: CarbonLoadModel,
+                           splitIndex: Int,
+                           hdfsStoreLocation: String,
+                           kettleHomePath: String,
+                           loadCount: Int,
+                           loadMetadataDetails: LoadMetadataDetails) extends Logging{
+
+  var storeLocation: String = ""
+
+  def initialize(): Unit = {
+    val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+    if (null == carbonPropertiesFilePath) {
+      System.setProperty("carbon.properties.filepath",
+        System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
+    }
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
+    CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
+    CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
+    CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
+    CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
+    CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
+    CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
+    CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
+    CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
+
+    // this property is used to determine whether temp location for carbon is inside
+    // container temp dir or is yarn application directory.
+    val carbonUseLocalDir = CarbonProperties.getInstance()
+      .getProperty("carbon.use.local.dir", "false")
+    if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+      val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+      if (null != storeLocations && storeLocations.nonEmpty) {
+        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+      }
+      if (storeLocation == null) {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+    }
+    else {
+      storeLocation = System.getProperty("java.io.tmpdir")
+    }
+    storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
+  }
+
+  def run(): Unit = {
+    try {
+      CarbonLoaderUtil.executeGraph(model, storeLocation, hdfsStoreLocation,
+        kettleHomePath)
+    } catch {
+      case e: DataLoadingException => if (e.getErrorCode ==
+        DataProcessorConstants.BAD_REC_FOUND) {
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+        logInfo("Bad Record Found")
+      } else {
+        throw e
+      }
+      case e: Exception =>
+        throw e
+    } finally {
+      // delete temp location data
+      try {
+        val isCompaction = false
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, isCompaction)
+      } catch {
+        case e: Exception =>
+          logError("Failed to delete local data", e)
+      }
+      if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+        loadMetadataDetails.getLoadStatus)) {
+        if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+          .equals(loadMetadataDetails.getLoadStatus)) {
+          logInfo("DataLoad complete")
+          logInfo("Data Load partially successful with LoadCount:" + loadCount)
+        } else {
+          logInfo("DataLoad complete")
+          logInfo("Data Loaded successfully with LoadCount:" + loadCount)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
+            model.getPartitionId)
+        }
+      }
+    }
+  }
+}
 /**
- * Use this RDD class to load data
+ * Use this RDD class to load csv data file
  *
  * @param sc                    The SparkContext to associate the RDD with.
  * @param result                Output result
@@ -91,7 +178,7 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
  * @tparam K Class of the key associated with the Result.
  * @tparam V Class of the value associated with the Result.
  */
-class CarbonDataLoadRDD[K, V](
+class DataFileLoaderRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
@@ -104,9 +191,7 @@ class CarbonDataLoadRDD[K, V](
     tableCreationTime: Long,
     schemaLastUpdatedTime: Long,
     blocksGroupBy: Array[(String, Array[BlockDetails])],
-    isTableSplitPartition: Boolean)
-  extends RDD[(K, V)](sc, Nil)
-    with Logging {
+    isTableSplitPartition: Boolean) extends RDD[(K, V)](sc, Nil) with Logging {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
@@ -145,92 +230,29 @@ class CarbonDataLoadRDD[K, V](
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
       var partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails()
       var model: CarbonLoadModel = _
       var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
                                theSplit.index
       try {
-        val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
-        if (null == carbonPropertiesFilePath) {
-          System.setProperty("carbon.properties.filepath",
-            System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
-        }
+        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         setModelAndBlocksInfo()
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
-        CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
-        CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
-        CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
-        CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
-        CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
-        CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
-        CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
-        CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
-
-        // this property is used to determine whether temp location for carbon is inside
-        // container temp dir or is yarn application directory.
-        val carbonUseLocalDir = CarbonProperties.getInstance()
-          .getProperty("carbon.use.local.dir", "false")
-
-        if(carbonUseLocalDir.equalsIgnoreCase("true")) {
-          val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-          if (null != storeLocations && storeLocations.nonEmpty) {
-            storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-          }
-          if (storeLocation == null) {
-            storeLocation = System.getProperty("java.io.tmpdir")
-          }
-        }
-        else {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-        storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
-        dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-
+        val loader = new SparkPartitionLoader(model, theSplit.index, hdfsStoreLocation,
+          kettleHomePath, loadCount, loadMetadataDetails)
+        loader.initialize
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         if (model.isRetentionRequest) {
           recreateAggregationTableForRetention
         }
         else if (model.isAggLoadRequest) {
-          dataloadStatus = createManualAggregateTable
+          loadMetadataDetails.setLoadStatus(createManualAggregateTable)
         }
         else {
-          try {
-            CarbonLoaderUtil.executeGraph(model, storeLocation, hdfsStoreLocation, kettleHomePath)
-          } catch {
-            case e: DataLoadingException => if (e.getErrorCode ==
-                                                DataProcessorConstants.BAD_REC_FOUND) {
-              dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-              logInfo("Bad Record Found")
-            } else {
-              throw e
-            }
-            case e: Exception =>
-              throw e
-          } finally {
-            // delete temp location data
-            val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
-            try {
-              val isCompaction = false
-              CarbonLoaderUtil
-                .deleteLocalDataLoadFolderLocation(model, isCompaction)
-            } catch {
-              case e: Exception =>
-                LOGGER.error(e)
-            }
-            if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(dataloadStatus)) {
-              if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-                  .equals(dataloadStatus)) {
-                logInfo("DataLoad complete")
-                logInfo("Data Load partially successful with LoadCount:" + loadCount)
-              } else {
-                logInfo("DataLoad complete")
-                logInfo("Data Loaded successfully with LoadCount:" + loadCount)
-                CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
-                  model.getPartitionId)
-              }
-            }
-          }
+          loader.run
         }
       } catch {
         case e: Exception =>
@@ -322,9 +344,10 @@ class CarbonDataLoadRDD[K, V](
           val copyListOfUpdatedLoadFolders = listOfUpdatedLoadFolders.asScala.toList
           loadTableSlices(listOfAllLoadFolders, details)
           val loadFolders = Array[String]()
-          dataloadStatus = iterateOverAggTables(aggTables, copyListOfLoadFolders.asJava,
-            copyListOfUpdatedLoadFolders.asJava, loadFolders)
-          if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(dataloadStatus)) {
+          loadMetadataDetails.setLoadStatus(iterateOverAggTables(aggTables,
+            copyListOfLoadFolders.asJava, copyListOfUpdatedLoadFolders.asJava, loadFolders))
+          if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+            loadMetadataDetails.getLoadStatus)) {
             // remove the current slice from memory not the table
             CarbonLoaderUtil
               .removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
@@ -334,7 +357,7 @@ class CarbonDataLoadRDD[K, V](
             logInfo("Aggregate tables creation successfull")
           }
         }
-        dataloadStatus
+        loadMetadataDetails.getLoadStatus
       }
 
       def loadTableSlices(listOfAllLoadFolders: java.util.List[String],
@@ -351,14 +374,15 @@ class CarbonDataLoadRDD[K, V](
         loadTableSlices(listOfAllLoadFolders, details)
         val loadFolders = Array[String]()
         val aggTable = model.getAggTableName
-        dataloadStatus = loadAggregationTable(listOfLoadFolders, listOfUpdatedLoadFolders,
-          loadFolders)
-        if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(dataloadStatus)) {
+        loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
+          listOfUpdatedLoadFolders, loadFolders))
+        if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+          loadMetadataDetails.getLoadStatus)) {
           logInfo(s"Aggregate table creation failed :: $aggTable")
         } else {
           logInfo(s"Aggregate table creation successfull :: $aggTable")
         }
-        dataloadStatus
+        loadMetadataDetails.getLoadStatus
       }
 
       def recreateAggregationTableForRetention = {
@@ -383,40 +407,31 @@ class CarbonDataLoadRDD[K, V](
         model.setAggLoadRequest(true)
         aggTables.asScala.foreach { aggTable =>
           model.setAggTableName(aggTable)
-          dataloadStatus = loadAggregationTable(listOfLoadFolders, listOfUpdatedLoadFolders,
-            loadFolders)
-          if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(dataloadStatus)) {
+          loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
+            listOfUpdatedLoadFolders, loadFolders))
+          if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
+            loadMetadataDetails.getLoadStatus)) {
             logInfo(s"Aggregate table creation failed :: aggTable")
-            return dataloadStatus
+            return loadMetadataDetails.getLoadStatus
           }
         }
-        dataloadStatus
+        loadMetadataDetails.getLoadStatus
       }
 
       def loadAggregationTable(listOfLoadFolders: java.util.List[String],
           listOfUpdatedLoadFolders: java.util.List[String],
           loadFolders: Array[String]): String = {
         // TODO: Implement it
-        dataloadStatus
+        loadMetadataDetails.getLoadStatus
       }
 
       var finished = false
-
       override def hasNext: Boolean = {
-
-        if (!finished) {
-          finished = true
-          finished
-        }
-        else {
-          !finished
-        }
+        !finished
       }
 
       override def next(): (K, V) = {
-        val loadMetadataDetails = new LoadMetadataDetails()
-        loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(dataloadStatus)
+        finished = true
         result.getKey(uniqueLoadStatusId, loadMetadataDetails)
       }
     }
@@ -462,3 +477,153 @@ class CarbonDataLoadRDD[K, V](
   }
 }
 
+/**
+ * Use this RDD class to load RDD
+ * @param sc
+ * @param result
+ * @param carbonLoadModel
+ * @param storeLocation
+ * @param hdfsStoreLocation
+ * @param kettleHomePath
+ * @param columinar
+ * @param loadCount
+ * @param tableCreationTime
+ * @param schemaLastUpdatedTime
+ * @param prev
+ * @tparam K
+ * @tparam V
+ */
+class DataFrameLoaderRDD[K, V](
+    sc: SparkContext,
+    result: DataLoadResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    var storeLocation: String,
+    hdfsStoreLocation: String,
+    kettleHomePath: String,
+    columinar: Boolean,
+    loadCount: Integer,
+    tableCreationTime: Long,
+    schemaLastUpdatedTime: Long,
+    prev: RDD[Row]) extends RDD[(K, V)](prev) with Logging {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  @DeveloperApi
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val resultIter = new Iterator[(K, V)] {
+      var partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
+        theSplit.index
+      try {
+        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+        carbonLoadModel.setPartitionId(partitionID)
+        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+        val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, hdfsStoreLocation,
+          kettleHomePath, loadCount, loadMetadataDetails)
+        loader.initialize
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        val rddIteratorKey = UUID.randomUUID().toString
+        try{
+          RddInputUtils.put(rddIteratorKey,
+            new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
+          carbonLoadModel.setRddIteratorKey(rddIteratorKey)
+          loader.run
+        } finally {
+          RddInputUtils.remove(rddIteratorKey)
+        }
+      } catch {
+        case e: Exception =>
+          logInfo("DataLoad failure")
+          LOGGER.error(e)
+          throw e
+      }
+
+      var finished = false
+      override def hasNext: Boolean = !finished
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+      }
+    }
+    resultIter
+  }
+
+  override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+}
+
+/**
+ * This class wrap Scala's Iterator to Java's Iterator.
+ * It also convert all columns to string data to use csv data loading flow.
+ * @param rddIter
+ * @param carbonLoadModel
+ */
+class RddIterator(rddIter: Iterator[Row],
+                  carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
+  val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+    .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  val format = new SimpleDateFormat(formatString)
+  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+
+  def hasNext: Boolean = rddIter.hasNext
+
+  private def getString(value: Any, level: Int = 1): String = {
+    value == null match {
+      case true => ""
+      case false => value match {
+        case s: String => s
+        case i: java.lang.Integer => i.toString
+        case d: java.lang.Double => d.toString
+        case t: java.sql.Timestamp => format format t
+        case d: java.sql.Date => format format d
+        case d: java.math.BigDecimal => d.toPlainString
+        case b: java.lang.Boolean => b.toString
+        case s: java.lang.Short => s.toString
+        case f: java.lang.Float => f.toString
+        case bs: Array[Byte] => new String(bs)
+        case s: scala.collection.Seq[Any] =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          s.foreach { x =>
+            builder.append(getString(x, level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - 1)
+        case m: scala.collection.Map[Any, Any] =>
+          throw new Exception("Unsupported data type: Map")
+        case r: org.apache.spark.sql.Row =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          for (i <- 0 until r.length) {
+            builder.append(getString(r(i), level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - 1)
+        case other => other.toString
+      }
+    }
+  }
+
+  def next: Array[String] = {
+    val row = rddIter.next()
+    val columns = new Array[String](row.length)
+    for (i <- 0 until row.length) {
+      columns(i) = getString(row(i))
+    }
+    columns
+  }
+
+  def remove(): Unit = {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index a64cf27..1bc7ad0 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -30,9 +30,9 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{util => _, _}
-import org.apache.spark.sql.{CarbonEnv, SQLContext}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
-import org.apache.spark.sql.hive.{DistributionUtil, TableMeta}
+import org.apache.spark.sql.hive.{DistributionUtil}
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -47,12 +47,11 @@ import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, Com
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, LoadMetadataUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
 
 /**
  * This is the factory class which can create different RDD depends on user needs.
@@ -667,7 +666,8 @@ object CarbonDataRDDFactory extends Logging {
       partitioner: Partitioner,
       columinar: Boolean,
       isAgg: Boolean,
-      partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) {
+      partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
+      dataFrame: Option[DataFrame] = None) {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
     // for handling of the segment Merging.
@@ -817,7 +817,9 @@ object CarbonDataRDDFactory extends Logging {
       // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean
       val isTableSplitPartition = false
       var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
-      isTableSplitPartition match {
+      var status: Array[(String, LoadMetadataDetails)] = null
+
+      def loadDataFile(): Unit = { isTableSplitPartition match {
         case true =>
           /*
            * when data handle by table split partition
@@ -856,7 +858,7 @@ object CarbonDataRDDFactory extends Logging {
                 val pathBuilder = new StringBuilder()
                 pathBuilder.append(carbonLoadModel.getFactFilePath)
                 if (!carbonLoadModel.getFactFilePath.endsWith("/")
-                    && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
+                  && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
                   pathBuilder.append("/")
                 }
                 pathBuilder.append(split.getPartition.getUniqueID).append("/")
@@ -941,18 +943,9 @@ object CarbonDataRDDFactory extends Logging {
             (entry._1, blockDetailsList)
           }
           ).toArray
-      }
+        }
 
-      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(hdfsStoreLocation,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName,
-        partitioner.partitionCount, currentLoadCount.toString)
-      var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-      var status: Array[(String, LoadMetadataDetails)] = null
-      var errorMessage: String = "DataLoad failure"
-      var executorMessage: String = ""
-      try {
-        status = new
-            CarbonDataLoadRDD(sqlContext.sparkContext,
+        status = new DataFileLoaderRDD(sqlContext.sparkContext,
               new DataLoadResultImpl(),
               carbonLoadModel,
               storeLocation,
@@ -966,6 +959,39 @@ object CarbonDataRDDFactory extends Logging {
               blocksGroupBy,
               isTableSplitPartition
             ).collect()
+      }
+
+      def loadDataFrame(): Unit = {
+        var rdd = dataFrame.get.rdd
+        var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
+        numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
+        rdd = rdd.coalesce(numPartitions, false)
+
+        status = new DataFrameLoaderRDD(sqlContext.sparkContext,
+          new DataLoadResultImpl(),
+          carbonLoadModel,
+          storeLocation,
+          hdfsStoreLocation,
+          kettleHomePath,
+          columinar,
+          currentLoadCount,
+          tableCreationTime,
+          schemaLastUpdatedTime,
+          rdd).collect()
+      }
+
+      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(hdfsStoreLocation,
+        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName,
+        partitioner.partitionCount, currentLoadCount.toString)
+      var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+      var errorMessage: String = "DataLoad failure"
+      var executorMessage: String = ""
+      try {
+        if (dataFrame.isDefined) {
+          loadDataFrame()
+        } else {
+          loadDataFile()
+        }
         val newStatusMap = scala.collection.mutable.Map.empty[String, String]
         status.foreach { eachLoadStatus =>
           val state = newStatusMap.get(eachLoadStatus._1)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index e714520..09c32c8 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -29,7 +29,7 @@ import scala.util.control.Breaks.{break, breakable}
 import org.apache.commons.lang3.{ArrayUtils, StringUtils}
 import org.apache.spark.{Accumulator, Logging}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonEnv, CarbonRelation, DataFrame, SQLContext}
+import org.apache.spark.sql._
 import org.apache.spark.sql.hive.CarbonMetastoreCatalog
 import org.apache.spark.util.FileUtils
 
@@ -742,7 +742,8 @@ object GlobalDictionaryUtil extends Logging {
    */
   def generateGlobalDictionary(sqlContext: SQLContext,
                                carbonLoadModel: CarbonLoadModel,
-                               hdfsLocation: String): Unit = {
+                               hdfsLocation: String,
+                               dataFrame: Option[DataFrame] = None): Unit = {
     try {
       val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier
@@ -760,7 +761,11 @@ object GlobalDictionaryUtil extends Logging {
       if(StringUtils.isEmpty(allDictionaryPath)) {
         logInfo("Generate global dictionary from source data files!")
         // load data by using dataSource com.databricks.spark.csv
-        var df = loadDataFrame(sqlContext, carbonLoadModel)
+        var df = if (dataFrame.isDefined) {
+          dataFrame.get
+        } else {
+          loadDataFrame(sqlContext, carbonLoadModel)
+        }
         var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
           df.columns
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 0ada74a..bbc0ba7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -837,7 +837,7 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
   }
 }
 
-private[sql] case class CreateTable(cm: tableModel) extends RunnableCommand {
+case class CreateTable(cm: tableModel) extends RunnableCommand {
 
   def run(sqlContext: SQLContext): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -1024,14 +1024,15 @@ private[sql] case class DeleteLoadsByLoadDate(
 
 }
 
-private[sql] case class LoadTable(
+case class LoadTable(
     databaseNameOp: Option[String],
     tableName: String,
     factPathFromUser: String,
     dimFilesPath: Seq[DataLoadTableFileMapping],
     partionValues: Map[String, String],
     isOverwriteExist: Boolean = false,
-    var inputSqlString: String = null) extends RunnableCommand {
+    var inputSqlString: String = null,
+    dataFrame: Option[DataFrame] = None) extends RunnableCommand {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -1070,7 +1071,8 @@ private[sql] case class LoadTable(
         sys.error("Table is locked for updation. Please try after some time")
       }
 
-      val factPath = FileUtils.getPaths(CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+      val factPath = if (dataFrame.isDefined) "" else FileUtils.getPaths(
+        CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
@@ -1167,11 +1169,12 @@ private[sql] case class LoadTable(
           carbonLoadModel.setDirectLoad(true)
         }
         GlobalDictionaryUtil
-          .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath)
+          .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
+            dataFrame)
         CarbonDataRDDFactory
           .loadCarbonData(sqlContext, carbonLoadModel, storeLocation, relation.tableMeta.storePath,
             kettleHomePath,
-            relation.tableMeta.partitioner, columinar, isAgg = false, partitionStatus)
+            relation.tableMeta.partitioner, columinar, isAgg = false, partitionStatus, dataFrame)
       }
       catch {
         case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index d8e23bd..4817612 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -227,7 +227,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
       case ShowLoadsCommand(databaseName, table, limit) =>
         ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
       case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
-      partionValues, isOverwriteExist, inputSqlString) =>
+      partionValues, isOverwriteExist, inputSqlString, _) =>
         val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
             .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
         if (isCarbonTable || partionValues.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
new file mode 100644
index 0000000..245a5e8
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
+  var df: DataFrame = _
+
+  def buildTestData() = {
+    import implicits._
+    df = sc.parallelize(1 to 1000)
+      .map(x => ("a", "b", x))
+      .toDF("c1", "c2", "c3")
+  }
+
+  def dropTable() = {
+    sql("DROP TABLE IF EXISTS carbon1")
+    sql("DROP TABLE IF EXISTS carbon2")
+    sql("DROP TABLE IF EXISTS carbon3")
+  }
+
+
+
+  override def beforeAll {
+    dropTable
+    buildTestData
+  }
+
+  test("test load dataframe with saving compressed csv files") {
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon1")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("select count(*) from carbon1 where c3 > 500"), Row(500)
+    )
+  }
+
+  test("test load dataframe with saving csv uncompressed files") {
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon2")
+      .option("tempCSV", "true")
+      .option("compress", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("select count(*) from carbon2 where c3 > 500"), Row(500)
+    )
+  }
+
+  test("test load dataframe without saving csv files") {
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon3")
+      .option("tempCSV", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(
+      sql("select count(*) from carbon3 where c3 > 500"), Row(500)
+    )
+  }
+
+  override def afterAll {
+    dropTable
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
index 42a8382..739035a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
@@ -65,6 +65,8 @@ public class DataLoadModel {
 
   private String commentCharacter;
 
+  private String rddIteratorKey;
+
   private String maxColumns;
   /**
    * @return Returns the schemaInfo.
@@ -226,5 +228,13 @@ public class DataLoadModel {
   public void setMaxColumns(String maxColumns) {
     this.maxColumns = maxColumns;
   }
+
+  public String getRddIteratorKey() {
+    return rddIteratorKey;
+  }
+
+  public void setRddIteratorKey(String rddIteratorKey) {
+    this.rddIteratorKey = rddIteratorKey;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
index 680cb37..7ec4f1b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
@@ -138,7 +138,7 @@ public class DataGraphExecuter {
       String partitionId, CarbonDataLoadSchema schema) throws DataLoadingException {
 
     //This Method will validate the both fact and dimension csv files.
-    if (!schemaInfo.isAutoAggregateRequest()) {
+    if (!schemaInfo.isAutoAggregateRequest() && model.getRddIteratorKey() == null ) {
       validateCSVFiles(schemaInfo, partitionId, schema);
     }
     execute(graphFilePath, measureColumns, schemaInfo);
@@ -367,7 +367,7 @@ public class DataGraphExecuter {
       StringBuilder builder, StringBuilder measuresInCSVFile) throws DataLoadingException {
     for (StepMeta step : stepsMeta) {
       if (step.getStepMetaInterface() instanceof CsvInputMeta) {
-        if (null != model.getCsvFilePath()) {
+        if (null != model.getCsvFilePath() && model.getRddIteratorKey() == null) {
           CarbonFile csvFileToRead = GraphExecutionUtil.getCsvFileToRead(model.getCsvFilePath());
           TextFileInputField[] inputFields = GraphExecutionUtil
               .getTextInputFiles(csvFileToRead, measureColumns, builder, measuresInCSVFile, ",");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
index 8db8ed4..cf861bc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
@@ -22,6 +22,7 @@ package org.apache.carbondata.processing.csvreaderstep;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -38,6 +39,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
 
+import org.apache.commons.lang3.StringUtils;
 import org.pentaho.di.core.Const;
 import org.pentaho.di.core.exception.KettleException;
 import org.pentaho.di.core.logging.LogChannelInterface;
@@ -85,6 +87,11 @@ public class CsvInput extends BaseStep implements StepInterface {
 
   private ExecutorService exec;
 
+  /**
+   * If rddIteratorKey is not null, read data from RDD
+   */
+  private String rddIteratorKey = null;
+
   public CsvInput(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr,
       TransMeta transMeta, Trans trans) {
     super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
@@ -332,6 +339,8 @@ public class CsvInput extends BaseStep implements StepInterface {
           data.rownumFieldIndex++;
         }
       }
+      rddIteratorKey = StringUtils.isEmpty(meta.getRddIteratorKey()) ? null : meta
+              .getRddIteratorKey();
     }
 
     // start multi-thread to process
@@ -343,41 +352,57 @@ public class CsvInput extends BaseStep implements StepInterface {
     } catch (NumberFormatException exc) {
       numberOfNodes = NUM_CORES_DEFAULT_VAL;
     }
+    if (rddIteratorKey == null) {
+      BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID());
+      if (blocksInfo.length == 0) {
+        //if isDirectLoad = true, and partition number > file num
+        //then blocksInfo will get empty in some partition processing, so just return
+        setOutputDone();
+        return false;
+      }
 
-    BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID());
-    if (blocksInfo.length == 0) {
-      //if isDirectLoad = true, and partition number > file num
-      //then blocksInfo will get empty in some partition processing, so just return
-      setOutputDone();
-      return false;
-    }
-
-    if (numberOfNodes > blocksInfo.length) {
-      numberOfNodes = blocksInfo.length;
-    }
+      if (numberOfNodes > blocksInfo.length) {
+        numberOfNodes = blocksInfo.length;
+      }
 
-    //new the empty lists
-    for (int pos = 0; pos < numberOfNodes; pos++) {
-      threadBlockList.add(new ArrayList<BlockDetails>());
-    }
+      //new the empty lists
+      for (int pos = 0; pos < numberOfNodes; pos++) {
+        threadBlockList.add(new ArrayList<BlockDetails>());
+      }
 
-    //block balance to every thread
-    for (int pos = 0; pos < blocksInfo.length; ) {
-      for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
-        if (pos < blocksInfo.length) {
-          threadBlockList.get(threadNum).add(blocksInfo[pos++]);
+      //block balance to every thread
+      for (int pos = 0; pos < blocksInfo.length; ) {
+        for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
+          if (pos < blocksInfo.length) {
+            threadBlockList.get(threadNum).add(blocksInfo[pos++]);
+          }
         }
       }
+      LOGGER.info("*****************Started all csv reading***********");
+      startProcess(numberOfNodes);
+      LOGGER.info("*****************Completed all csv reading***********");
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
+              meta.getPartitionID(), System.currentTimeMillis());
+    } else {
+      scanRddIterator();
     }
-    LOGGER.info("*****************Started all csv reading***********");
-    startProcess(numberOfNodes);
-    LOGGER.info("*****************Completed all csv reading***********");
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
-        meta.getPartitionID(), System.currentTimeMillis());
     setOutputDone();
     return false;
   }
 
+  private void scanRddIterator() throws RuntimeException {
+    Iterator<String[]> iterator = RddInputUtils.getAndRemove(rddIteratorKey);
+    if (iterator != null) {
+      try{
+        while(iterator.hasNext()){
+          putRow(data.outputRowMeta, iterator.next());
+        }
+      } catch (KettleException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
   private void startProcess(final int numberOfNodes) throws RuntimeException {
     exec = Executors.newFixedThreadPool(numberOfNodes);
 
@@ -488,7 +513,7 @@ public class CsvInput extends BaseStep implements StepInterface {
       if (getTransMeta().findNrPrevSteps(getStepMeta()) == 0) {
         String filename = environmentSubstitute(meta.getFilename());
 
-        if (Const.isEmpty(filename)) {
+        if (Const.isEmpty(filename) && Const.isEmpty(meta.getRddIteratorKey())) {
           logError(BaseMessages.getString(PKG, "CsvInput.MissingFilename.Message")); //$NON-NLS-1$
           return false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInputMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInputMeta.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInputMeta.java
index 3e2b621..b4ae863 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInputMeta.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInputMeta.java
@@ -103,6 +103,8 @@ public class CsvInputMeta extends BaseStepMeta
 
   private String commentCharacter;
 
+  private String rddIteratorKey;
+
   private String maxColumns;
 
   public CsvInputMeta() {
@@ -127,6 +129,7 @@ public class CsvInputMeta extends BaseStepMeta
     escapeCharacter ="\\";
     quoteCharacter = "\"";
     commentCharacter = "#";
+    rddIteratorKey = "";
   }
 
   private void readData(Node stepnode) throws KettleXMLException {
@@ -165,6 +168,7 @@ public class CsvInputMeta extends BaseStepMeta
       escapeCharacter = XMLHandler.getTagValue(stepnode, "escapeCharacter");
       quoteCharacter = XMLHandler.getTagValue(stepnode, "quoteCharacter");
       commentCharacter = XMLHandler.getTagValue(stepnode, "commentCharacter");
+      rddIteratorKey = XMLHandler.getTagValue(stepnode, "rddIteratorKey");
       maxColumns = XMLHandler.getTagValue(stepnode, "maxColumns");
       Node fields = XMLHandler.getSubNode(stepnode, getXmlCode("FIELDS"));
       int nrfields = XMLHandler.countNodes(fields, getXmlCode("FIELD"));
@@ -230,6 +234,7 @@ public class CsvInputMeta extends BaseStepMeta
     retval.append("    ").append(XMLHandler.addTagValue("escapeCharacter", escapeCharacter));
     retval.append("    ").append(XMLHandler.addTagValue("quoteCharacter", quoteCharacter));
     retval.append("    ").append(XMLHandler.addTagValue("commentCharacter", commentCharacter));
+    retval.append("    ").append(XMLHandler.addTagValue("rddIteratorKey", rddIteratorKey));
     retval.append("    ").append(XMLHandler.addTagValue("maxColumns", maxColumns));
     retval.append("    ").append(XMLHandler.openTag(getXmlCode("FIELDS"))).append(Const.CR);
     for (int i = 0; i < inputFields.length; i++) {
@@ -285,6 +290,7 @@ public class CsvInputMeta extends BaseStepMeta
       escapeCharacter = rep.getStepAttributeString(idStep, getRepCode("escapeCharacter"));
       quoteCharacter = rep.getStepAttributeString(idStep, getRepCode("quoteCharacter"));
       commentCharacter = rep.getStepAttributeString(idStep, getRepCode("commentCharacter"));
+      rddIteratorKey = rep.getStepAttributeString(idStep, getRepCode("rddIteratorKey"));
       maxColumns = rep.getStepAttributeString(idStep, getRepCode("maxColumns"));
       int nrfields = rep.countNrStepAttributes(idStep, getRepCode("FIELD_NAME"));
 
@@ -345,6 +351,8 @@ public class CsvInputMeta extends BaseStepMeta
           quoteCharacter);
       rep.saveStepAttribute(idTransformation, idStep, getRepCode("commentCharacter"),
           commentCharacter);
+      rep.saveStepAttribute(idTransformation, idStep, getRepCode("rddIteratorKey"),
+              rddIteratorKey);
       rep.saveStepAttribute(idTransformation, idStep, getRepCode("maxColumns"),
           maxColumns);
       for (int i = 0; i < inputFields.length; i++) {
@@ -858,6 +866,8 @@ public class CsvInputMeta extends BaseStepMeta
           quoteCharacter = (String) entry.getValue();
         } else if ("commentCharacter".equals(attributeKey)) {
           commentCharacter = (String) entry.getValue();
+        } else if ("rddIteratorKey".equals(attributeKey)) {
+          rddIteratorKey = (String) entry.getValue();
         } else {
           throw new RuntimeException(
               "Unhandled metadata injection of attribute: " + attr.toString() + " - " + attr
@@ -953,4 +963,11 @@ public class CsvInputMeta extends BaseStepMeta
   public void setMaxColumns(String maxColumns) {
     this.maxColumns = maxColumns;
   }
+
+  public String getRddIteratorKey() {
+    return this.rddIteratorKey;
+  }
+  public void setRddIteratorKey(String rddIteratorKey) {
+    this.rddIteratorKey = rddIteratorKey;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
new file mode 100644
index 0000000..f9a0429
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.csvreaderstep;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class RddInputUtils {
+  private static Map<String, Iterator<String[]>> iteratorMap = new HashMap<String,
+      Iterator<String[]>>();
+
+  public static void put(String key, Iterator<String[]> value) {
+    iteratorMap.put(key, value);
+  }
+
+  public static Iterator<String[]> getAndRemove(String key) {
+    Iterator<String[]> iter = iteratorMap.get(key);
+    remove(key);
+    return iter;
+  }
+
+  public static void remove(String key) {
+    iteratorMap.remove(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/DataProcessTaskStatus.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/DataProcessTaskStatus.java b/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/DataProcessTaskStatus.java
index a0fb157..e8581ad 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/DataProcessTaskStatus.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/DataProcessTaskStatus.java
@@ -91,6 +91,8 @@ public class DataProcessTaskStatus implements IDataProcessStatus, Serializable {
 
   private String commentCharacter;
 
+  private String rddIteratorKey;
+
   public DataProcessTaskStatus(String databaseName, String tableName) {
     this.databaseName = databaseName;
     this.tableName = tableName;
@@ -297,4 +299,12 @@ public class DataProcessTaskStatus implements IDataProcessStatus, Serializable {
   public void setCommentCharacter(String commentCharacter) {
     this.commentCharacter = commentCharacter;
   }
+
+  public String getRddIteratorKey() {
+    return rddIteratorKey;
+  }
+
+  public void setRddIteratorKey(String rddIteratorKey) {
+    this.rddIteratorKey = rddIteratorKey;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/IDataProcessStatus.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/IDataProcessStatus.java b/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/IDataProcessStatus.java
index 56dfe00..efd4ba8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/IDataProcessStatus.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/IDataProcessStatus.java
@@ -193,4 +193,6 @@ public interface IDataProcessStatus {
   String getQuoteCharacter();
 
   String getCommentCharacter();
+
+  String getRddIteratorKey();
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0018756d/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
index 4c2c178..f939f48 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -204,6 +204,8 @@ public class GraphGenerator {
    */
   private String maxColumns;
 
+  private String rddIteratorKey;
+
   public GraphGenerator(DataLoadModel dataLoadModel, boolean isHDFSReadMode, String partitionID,
       String factStoreLocation, int allocate,
       CarbonDataLoadSchema carbonDataLoadSchema, String segmentId) {
@@ -237,6 +239,7 @@ public class GraphGenerator {
     this(dataLoadModel, isHDFSReadMode, partitionID, factStoreLocation, allocate,
         carbonDataLoadSchema, segmentId);
     this.outputLocation = outputLocation;
+    this.rddIteratorKey = dataLoadModel.getRddIteratorKey();
   }
 
   /**
@@ -456,6 +459,7 @@ public class GraphGenerator {
     csvInputMeta.setEscapeCharacter(this.escapeCharacter);
     csvInputMeta.setQuoteCharacter(this.quoteCharacter);
     csvInputMeta.setCommentCharacter(this.commentCharacter);
+    csvInputMeta.setRddIteratorKey(this.rddIteratorKey == null ? "" : this.rddIteratorKey);
     csvDataStep.setDraw(true);
     csvDataStep.setDescription("Read raw data from " + GraphGeneratorConstants.CSV_INPUT);
 


Mime
View raw message