carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: merge from master
Date Sun, 11 Dec 2016 03:59:28 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 98564f04d -> 26e3e0023


merge from master

reset maven-source-plugin

remove comments on createTableFromThrift and rais jira later

spark streaming dataframe support

reset maven-source-plugin

add comments for NewDataFrameLoaderRDD and NewRddIterator

format comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6c9194d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6c9194d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6c9194d9

Branch: refs/heads/master
Commit: 6c9194d97c54351434866f423ef44907b887ae5a
Parents: 98564f0
Author: WilliamZhu <allwefantasy@gmail.com>
Authored: Tue Dec 6 12:39:22 2016 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Sun Dec 11 09:27:51 2016 +0530

----------------------------------------------------------------------
 .../apache/carbondata/spark/CarbonOption.scala  |   2 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 167 +++++++++++++++++++
 .../spark/CarbonDataFrameWriter.scala           |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  52 ++++--
 .../execution/command/carbonTableSchema.scala   |   5 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   6 +-
 .../apache/carbondata/spark/CarbonOption.scala  |   2 +
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   3 +
 .../util/CarbonDataProcessorUtil.java           |  31 ++++
 9 files changed, 247 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index a0503c7..213712e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -45,4 +45,6 @@ class CarbonOption(options: Map[String, String]) {
   def compress: Boolean = options.getOrElse("compress", "false").toBoolean
 
   def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
+
+  def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 32770f7..96bb5ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -30,6 +30,8 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.command.Partitioner
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -308,3 +310,168 @@ class NewCarbonDataLoadRDD[K, V](
     }
   }
 }
+
+/**
+ *  It loads the data to carbon from spark DataFrame using
+ *  @see org.apache.carbondata.processing.newflow.DataLoadExecutor without
+ *  kettle requirement
+ */
+class NewDataFrameLoaderRDD[K, V](
+                                   sc: SparkContext,
+                                   result: DataLoadResult[K, V],
+                                   carbonLoadModel: CarbonLoadModel,
+                                   loadCount: Integer,
+                                   tableCreationTime: Long,
+                                   schemaLastUpdatedTime: Long,
+                                   prev: RDD[Row]) extends RDD[(K, V)](prev) {
+
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val iter = new Iterator[(K, V)] {
+      var partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      var model: CarbonLoadModel = carbonLoadModel
+      var uniqueLoadStatusId =
+        carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
+      try {
+
+        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+        carbonLoadModel.setPartitionId(partitionID)
+        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+
+        val iterator = new NewRddIterator(
+          firstParent[Row].iterator(theSplit, context),
+          carbonLoadModel)
+
+        class CarbonIteratorImpl(iterator: util.Iterator[Array[AnyRef]])
+          extends CarbonIterator[Array[AnyRef]] {
+          override def initialize(): Unit = {}
+
+          override def close(): Unit = {}
+
+          override def next(): Array[AnyRef] = {
+            iterator.next
+          }
+
+          override def hasNext: Boolean = {
+            iterator.hasNext
+          }
+        }
+
+
+        val recordReaders: Array[CarbonIterator[Array[AnyRef]]] =
+          Array(new CarbonIteratorImpl(iterator))
+
+        val loader = new SparkPartitionLoader(model,
+          theSplit.index,
+          null,
+          null,
+          loadCount,
+          loadMetadataDetails)
+        // Intialize to set carbon properties
+        loader.initialize()
+
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
+
+      } catch {
+        case e: BadRecordFoundException =>
+          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          logInfo("Bad Record Found")
+        case e: Exception =>
+          logInfo("DataLoad failure", e)
+          LOGGER.error(e)
+          throw e
+      }
+      var finished = false
+
+      override def hasNext: Boolean = !finished
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+      }
+    }
+    iter
+  }
+  override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+}
+
+/**
+ * This class wrap Scala's Iterator to Java's Iterator.
+ * It also convert all columns to string data since carbondata will recognize the right type
+ * according to schema from spark DataFrame.
+ * @see org.apache.carbondata.spark.rdd.RddIterator
+ * @param rddIter
+ * @param carbonLoadModel
+ */
+class NewRddIterator(rddIter: Iterator[Row],
+                     carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[AnyRef]]
{
+  val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+    .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  val format = new SimpleDateFormat(formatString)
+  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+
+  def hasNext: Boolean = rddIter.hasNext
+
+  private def getString(value: Any, level: Int = 1): String = {
+    if (value == null) {
+      ""
+    } else {
+      value match {
+        case s: String => s
+        case i: java.lang.Integer => i.toString
+        case d: java.lang.Double => d.toString
+        case t: java.sql.Timestamp => format format t
+        case d: java.sql.Date => format format d
+        case d: java.math.BigDecimal => d.toPlainString
+        case b: java.lang.Boolean => b.toString
+        case s: java.lang.Short => s.toString
+        case f: java.lang.Float => f.toString
+        case bs: Array[Byte] => new String(bs)
+        case s: scala.collection.Seq[Any] =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          s.foreach { x =>
+            builder.append(getString(x, level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - 1)
+        case m: scala.collection.Map[Any, Any] =>
+          throw new Exception("Unsupported data type: Map")
+        case r: org.apache.spark.sql.Row =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          for (i <- 0 until r.length) {
+            builder.append(getString(r(i), level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - 1)
+        case other => other.toString
+      }
+    }
+  }
+
+  def next: Array[AnyRef] = {
+    val row = rddIter.next()
+    val columns = new Array[Object](row.length)
+    for (i <- 0 until row.length) {
+      columns(i) = getString(row(i))
+    }
+    columns
+  }
+
+  def remove(): Unit = {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index c464538..41595d5 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -128,7 +128,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
       options.tableName,
       null,
       Seq(),
-      Map("fileheader" -> header),
+      Map("fileheader" -> header) ++ options.toMap,
       isOverwriteExist = false,
       null,
       Some(dataFrame)).run(cc)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8463477..53a5f67 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -634,23 +634,41 @@ object CarbonDataRDDFactory {
       def loadDataFrame(): Unit = {
         try {
           val rdd = dataFrame.get.rdd
-          val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
-            DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
-          }.distinct.size
-          val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
-            sqlContext.sparkContext)
-          val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
-
-          status = new DataFrameLoaderRDD(sqlContext.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            storePath,
-            kettleHomePath,
-            columnar,
-            currentLoadCount,
-            tableCreationTime,
-            schemaLastUpdatedTime,
-            newRdd).collect()
+
+          if (useKettle) {
+
+            val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
+              DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+            }.distinct.size
+            val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
+              sqlContext.sparkContext)
+            val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+
+            status = new DataFrameLoaderRDD(sqlContext.sparkContext,
+              new DataLoadResultImpl(),
+              carbonLoadModel,
+              storePath,
+              kettleHomePath,
+              columnar,
+              currentLoadCount,
+              tableCreationTime,
+              schemaLastUpdatedTime,
+              newRdd).collect()
+          } else {
+
+            var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
+            numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
+            val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false)
+
+            status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
+              new DataLoadResultImpl(),
+              carbonLoadModel,
+              currentLoadCount,
+              tableCreationTime,
+              schemaLastUpdatedTime,
+              coalesceRdd).collect()
+          }
+
         } catch {
           case ex: Exception =>
             LOGGER.error(ex, "load data frame failed")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 39d0841..779bb22 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -434,10 +434,9 @@ case class LoadTable(
 
 
       val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
-      val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
 
       // TODO It will be removed after kettle is removed.
-      val useKettle = options.get("use_kettle") match {
+      val useKettle = options.get("useKettle") match {
         case Some(value) => value.toBoolean
         case _ =>
           val useKettleLocal = System.getProperty("use.kettle")
@@ -448,6 +447,8 @@ case class LoadTable(
           }
       }
 
+      val kettleHomePath = if (useKettle) CarbonScalaUtil.getKettleHome(sqlContext) else
""
+
       val delimiter = options.getOrElse("delimiter", ",")
       val quoteChar = options.getOrElse("quotechar", "\"")
       val fileHeader = options.getOrElse("fileheader", "")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index bee891c..b065850 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -18,14 +18,13 @@
 package org.apache.spark.sql.hive
 
 import java.io._
-import java.util.{GregorianCalendar, UUID}
+import java.util.UUID
 
 import scala.Array.canBuildFrom
 import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.spark
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -42,7 +41,6 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType
 import org.apache.carbondata.core.reader.ThriftReader
@@ -293,10 +291,12 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
     }
+
     val thriftWriter = new ThriftWriter(schemaFilePath, false)
     thriftWriter.open()
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
+
     metadata.tablesMeta += tableMeta
     logInfo(s"Table $tableName for Database $dbName created successfully.")
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 5f0c7e3..b02d467 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -45,4 +45,6 @@ class CarbonOption(options: Map[String, String]) {
   def compress: Boolean = options.getOrElse("compress", "false").toBoolean
 
   def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
+
+  def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index e2e995c..5d12ec4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -54,6 +54,8 @@ public class ParallelReadMergeSorterImpl implements Sorter {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
 
+  private static final Object taskContext = CarbonDataProcessorUtil.fetchTaskContext();
+
   private SortParameters sortParameters;
 
   private SortIntermediateFileMerger intermediateFileMerger;
@@ -200,6 +202,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
     @Override
     public Void call() throws CarbonDataLoadingException {
       try {
+        CarbonDataProcessorUtil.configureTaskContext(taskContext);
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
           Iterator<CarbonRow> batchIterator = batch.getBatchIterator();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index eff59e5..3cb984d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -25,6 +25,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -604,4 +606,33 @@ public final class CarbonDataProcessorUtil {
     }
     return dateformatsHashMap;
   }
+
+  /**
+   * Maybe we can extract interfaces later to support task context in hive ,spark
+   */
+  public static Object fetchTaskContext() {
+    try {
+      return Class.forName("org.apache.spark.TaskContext").getMethod("get").invoke(null);
+    } catch (Exception e) {
+      //just ignore
+      LOGGER.info("org.apache.spark.TaskContext not found");
+      return null;
+    }
+  }
+
+  public static void configureTaskContext(Object context) {
+    try {
+      Class clazz = Class.forName("org.apache.spark.TaskContext$");
+      for (Method method : clazz.getDeclaredMethods()) {
+        if (method.getName().equals("setTaskContext")) {
+          Field field = clazz.getField("MODULE$");
+          Object instance = field.get(null);
+          method.invoke(instance, new Object[]{context});
+        }
+      }
+    } catch (Exception e) {
+      //just ignore
+      LOGGER.info("org.apache.spark.TaskContext not found");
+    }
+  }
 }
\ No newline at end of file


Mime
View raw message