carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] incubator-carbondata git commit: fixDataLoadingIssue
Date Thu, 05 Jan 2017 03:22:00 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master f8ef805e9 -> f5df74066


fixDataLoadingIssue


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0cfefbb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0cfefbb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0cfefbb4

Branch: refs/heads/master
Commit: 0cfefbb450b596da23f87a9cab65016c94f96a0a
Parents: f8ef805
Author: QiangCai <qiangcai@qq.com>
Authored: Thu Jan 5 11:03:25 2017 +0800
Committer: QiangCai <qiangcai@qq.com>
Committed: Thu Jan 5 11:03:25 2017 +0800

----------------------------------------------------------------------
 .../spark/util/GlobalDictionaryUtil.scala       |  4 +-
 .../AllDataTypesTestCaseAggregate.scala         |  1 +
 .../AutoHighCardinalityIdentifyTestCase.scala   |  8 ++--
 .../spark/sql/CarbonDataFrameWriter.scala       | 40 ++++++++++++++------
 4 files changed, 37 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0cfefbb4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index e87b27b..6877354 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -376,12 +376,12 @@ object GlobalDictionaryUtil {
       } else {
         carbonLoadModel.getCsvHeader
       }
-      val delimiter = if (carbonLoadModel.getCsvDelimiter == null) {
+      val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
         CarbonCommonConstants.COMMA
       } else {
         carbonLoadModel.getCsvDelimiter
       }
-      val quote = if (carbonLoadModel.getQuoteChar == null) {
+      val quote = if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
         "\""
       } else {
         carbonLoadModel.getQuoteChar

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0cfefbb4/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index c6a1405..274477c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -1109,6 +1109,7 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll
{
       sql("select c1,count(c1) from (select c1 as c1,c2 as c2 from carbonunion union all
select c2 as c1,c1 as c2 from carbonunion)t where c1='200' group by c1"),
       sql("select c1,count(c1) from (select c1 as c1,c2 as c2 from sparkunion union all select
c2 as c1,c1 as c2 from sparkunion)t where c1='200' group by c1"))
     sql("drop table if exists carbonunion")
+    sql("drop table if exists sparkunion")
   })
 
   test("select Min(imei) from (select imei from Carbon_automation_test order by imei) t")({

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0cfefbb4/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
index 5c61ab8..39f7683 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
@@ -36,9 +36,6 @@ import org.apache.carbondata.processing.model.CarbonLoadModel
 
 /**
   * Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil
-  *
-  * @date: Apr 10, 2016 10:34:58 PM
-  * @See org.apache.carbondata.spark.util.GlobalDictionaryUtil
   */
 class AutoHighCardinalityIdentifyTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -70,6 +67,11 @@ class AutoHighCardinalityIdentifyTestCase extends QueryTest with BeforeAndAfterA
     buildColGrpHighCardTable
   }
 
+  override def afterAll {
+    sql("drop table if exists highcard")
+    sql("drop table if exists colgrp_highcard")
+  }
+
   def buildTestData() = {
     val pwd = new File(this.getClass.getResource("/").getPath + "/../../").getCanonicalPath
     filePath = pwd + "/target/highcarddata.csv"

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0cfefbb4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 3057bee..db4d2f3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.sql.execution.command.LoadTable
 import org.apache.spark.sql.types._
 
@@ -54,7 +55,6 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame)
{
    * Firstly, saving DataFrame to CSV files
    * Secondly, load CSV files
    * @param options
-   * @param sqlContext
    */
   private def loadTempCSV(options: CarbonOption): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
@@ -91,17 +91,37 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame)
{
   }
 
   private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = {
-    var writer: DataFrameWriter[Row] =
-      dataFrame.write
-        .format(csvPackage)
-        .option("header", "false")
-        .mode(SaveMode.Overwrite)
+    val strRDD = dataFrame.rdd.mapPartitions { case iter =>
+      new Iterator[String] {
+        override def hasNext = iter.hasNext
+
+        def convertToCSVString(seq: Seq[Any]): String = {
+          val build = new java.lang.StringBuilder()
+          if (seq.head != null) {
+            build.append(seq.head.toString)
+          }
+          val itemIter = seq.tail.iterator
+          while (itemIter.hasNext) {
+            build.append(CarbonCommonConstants.COMMA)
+            val value = itemIter.next()
+            if (value != null) {
+              build.append(value.toString)
+            }
+          }
+          build.toString
+        }
 
-    if (options.compress) {
-      writer = writer.option("codec", "gzip")
+        override def next: String = {
+          convertToCSVString(iter.next.toSeq)
+        }
+      }
     }
 
-    writer.save(tempCSVFolder)
+    if (options.compress) {
+      strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec])
+    } else {
+      strRDD.saveAsTextFile(tempCSVFolder)
+    }
   }
 
   /**
@@ -121,8 +141,6 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame)
{
       Some(dataFrame)).run(sqlContext.sparkSession)
   }
 
-  private def csvPackage: String = "com.databricks.spark.csv.newapi"
-
   private def convertToCarbonType(sparkType: DataType): String = {
     sparkType match {
       case StringType => CarbonType.STRING.getName


Mime
View raw message