carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: fix spark2 compilation
Date Sat, 03 Dec 2016 00:41:02 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master cffcb998a -> d5f409840


fix spark2 compilation


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/223cf9aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/223cf9aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/223cf9aa

Branch: refs/heads/master
Commit: 223cf9aa7f226705cf947b972f128d0c16604fc8
Parents: cffcb99
Author: jackylk <jacky.likun@huawei.com>
Authored: Fri Dec 2 22:09:33 2016 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sat Dec 3 08:37:53 2016 +0800

----------------------------------------------------------------------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 39 ++++++++++++--------
 1 file changed, 23 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/223cf9aa/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 914cdab..05ba3ac 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -32,11 +32,12 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
+import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.hadoop.csv.CSVInputFormat
 import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
 import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -168,9 +169,13 @@ class NewCarbonDataLoadRDD[K, V](
           throw e
       }
 
-      def getInputIterators: Array[util.Iterator[Array[AnyRef]]] = {
+      def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
         val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index,
0)
-        val configuration: Configuration = confBroadcast.value.value
+        var configuration: Configuration = confBroadcast.value.value
+        // Broadcast fails in some cases
+        if (configuration == null) {
+          configuration = new Configuration()
+        }
         configureCSVInputFormat(configuration)
         val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
         val format = new CSVInputFormat
@@ -195,10 +200,11 @@ class NewCarbonDataLoadRDD[K, V](
               partitionID, split.partitionBlocksDetail.length)
           val readers =
           split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
-          readers.zipWithIndex.foreach { case (reader, index) =>
-            reader.initialize(split.partitionBlocksDetail(index), hadoopAttemptContext)
+          readers.zipWithIndex.map { case (reader, index) =>
+            new RecordReaderIterator(reader,
+              split.partitionBlocksDetail(index),
+              hadoopAttemptContext)
           }
-          readers.map(new RecordReaderIterator(_))
         } else {
           // for node partition
           val split = theSplit.asInstanceOf[CarbonNodePartition]
@@ -220,21 +226,22 @@ class NewCarbonDataLoadRDD[K, V](
           StandardLogService.setThreadName(blocksID, null)
           val readers =
             split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
-          readers.zipWithIndex.foreach { case (reader, index) =>
-            reader.initialize(split.nodeBlocksDetail(index), hadoopAttemptContext)
+          readers.zipWithIndex.map { case (reader, index) =>
+            new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
           }
-          readers.map(new RecordReaderIterator(_))
         }
       }
 
       def configureCSVInputFormat(configuration: Configuration): Unit = {
-        CSVInputFormat.setCommentCharacter(carbonLoadModel.getCommentChar, configuration)
-        CSVInputFormat.setCSVDelimiter(carbonLoadModel.getCsvDelimiter, configuration)
-        CSVInputFormat.setEscapeCharacter(carbonLoadModel.getEscapeChar, configuration)
-        CSVInputFormat.setHeaderExtractionEnabled(
-          carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty,
-          configuration)
-        CSVInputFormat.setQuoteCharacter(carbonLoadModel.getQuoteChar, configuration)
+        CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
+        CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
+        CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
+        CSVInputFormat.setHeaderExtractionEnabled(configuration,
+          carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
+        CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
+        CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
       }
 
       /**


Mime
View raw message