carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: Fixed testcase issues in spark 1.6 and 2.1 of no kettle. And also refactored insert into flow of no kettle
Date Fri, 13 Jan 2017 15:52:48 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 32354b37b -> 6d29fa2f0


Fixed testcase issues in spark 1.6 and 2.1 of no kettle. And also refactored insert into flow
of no kettle


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8100d949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8100d949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8100d949

Branch: refs/heads/master
Commit: 8100d949e4264651b09b42a173aa17085369cd82
Parents: 32354b3
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Fri Jan 13 00:16:33 2017 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Fri Jan 13 23:52:01 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 10 +++++
 .../spark/rdd/CarbonDataLoadRDD.scala           | 18 ++++----
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 30 ++++---------
 .../processing/csvreaderstep/CsvInput.java      | 11 ++---
 .../csvreaderstep/JavaRddIterator.java          | 32 --------------
 .../processing/csvreaderstep/RddInputUtils.java | 12 +++---
 .../processing/model/CarbonLoadModel.java       | 15 +++++++
 .../newflow/CarbonDataLoadConfiguration.java    | 10 +++++
 .../newflow/DataLoadProcessBuilder.java         |  1 +
 .../sort/impl/ParallelReadMergeSorterImpl.java  |  3 --
 .../newflow/steps/InputProcessorStepImpl.java   | 45 +++++++++++++++-----
 .../sortandgroupby/sortdata/SortDataRows.java   |  7 ++-
 .../util/CarbonDataProcessorUtil.java           | 30 -------------
 13 files changed, 105 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 041f5ed..664720e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1115,6 +1115,16 @@ public final class CarbonCommonConstants {
    */
   public static final String USE_OFFHEAP_IN_QUERY_PROCSSING_DEFAULT = "true";
 
+  /**
+   * whether to prefetch data while loading.
+   */
+  public static final String USE_PREFETCH_WHILE_LOADING = "carbon.loading.prefetch";
+
+  /**
+   * default value for prefetch data while loading.
+   */
+  public static final String USE_PREFETCH_WHILE_LOADING_DEFAULT = "false";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 4392775..14a0930 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -33,21 +33,21 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.util.SparkUtil
 
+import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.processing.constants.DataProcessorConstants
-import org.apache.carbondata.processing.csvreaderstep.{JavaRddIterator, RddInputUtils}
+import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.load.{_}
+import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil}
 
 /**
  * This partition class use to split by TableSplit
@@ -549,12 +549,12 @@ class DataFrameLoaderRDD[K, V](
 
 class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
     carbonLoadModel: CarbonLoadModel,
-    context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] {
+    context: TaskContext) extends CarbonIterator[CarbonIterator[Array[String]]] {
   val serializer = SparkEnv.get.closureSerializer.newInstance()
   var serializeBuffer: ByteBuffer = null
   def hasNext: Boolean = partitionIter.hasNext
 
-  def next: JavaRddIterator[Array[String]] = {
+  def next: CarbonIterator[Array[String]] = {
     val value = partitionIter.next
     // The rdd (which come from Hive Table) don't support to read dataframe concurrently.
     // So here will create different rdd instance for each thread.
@@ -569,7 +569,7 @@ class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
         carbonLoadModel,
         context)
   }
-  def initialize: Unit = {
+  override def initialize: Unit = {
     SparkUtil.setTaskContext(context)
   }
 }
@@ -583,7 +583,7 @@ class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
  */
 class RddIterator(rddIter: Iterator[Row],
                   carbonLoadModel: CarbonLoadModel,
-                  context: TaskContext) extends JavaRddIterator[Array[String]] {
+                  context: TaskContext) extends CarbonIterator[Array[String]] {
 
   val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -604,7 +604,7 @@ class RddIterator(rddIter: Iterator[Row],
     columns
   }
 
-  def initialize: Unit = {
+  override def initialize: Unit = {
     SparkUtil.setTaskContext(context)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 497df75..46e83a5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.execution.command.Partitioner
 import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.CarbonIterator
@@ -44,7 +43,6 @@ import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.hadoop.csv.CSVInputFormat
 import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
-import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadExecutor
 import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
@@ -151,6 +149,9 @@ class NewCarbonDataLoadRDD[K, V](
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
 
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+        val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+          .USE_PREFETCH_WHILE_LOADING, CarbonCommonConstants.USE_PREFETCH_WHILE_LOADING_DEFAULT)
+        carbonLoadModel.setPreFetch(preFetch.toBoolean)
         val recordReaders = getInputIterators
         val loader = new SparkPartitionLoader(model,
           theSplit.index,
@@ -330,6 +331,7 @@ class NewDataFrameLoaderRDD[K, V](
         carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+        carbonLoadModel.setPreFetch(false)
 
         val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
         val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit,
context)
@@ -344,10 +346,9 @@ class NewDataFrameLoaderRDD[K, V](
             serializeBuffer.rewind()
             serializer.deserialize[RDD[Row]](serializeBuffer)
           }
-          recordReaders += new CarbonIteratorImpl(
-            new NewRddIterator(newInstance.iterator(value.partition, context),
+          recordReaders += new NewRddIterator(newInstance.iterator(value.partition, context),
               carbonLoadModel,
-              context))
+              context)
         }
 
         val loader = new SparkPartitionLoader(model,
@@ -395,7 +396,7 @@ class NewDataFrameLoaderRDD[K, V](
  */
 class NewRddIterator(rddIter: Iterator[Row],
     carbonLoadModel: CarbonLoadModel,
-    context: TaskContext) extends JavaRddIterator[Array[AnyRef]] {
+    context: TaskContext) extends CarbonIterator[Array[AnyRef]] {
 
   val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -416,23 +417,8 @@ class NewRddIterator(rddIter: Iterator[Row],
     columns
   }
 
-  def initialize: Unit = {
+  override def initialize: Unit = {
     SparkUtil.setTaskContext(context)
   }
 
 }
-
-class CarbonIteratorImpl(iterator: NewRddIterator)
-  extends CarbonIterator[Array[AnyRef]] {
-  override def initialize(): Unit = iterator.initialize
-
-  override def close(): Unit = {}
-
-  override def next(): Array[AnyRef] = {
-    iterator.next
-  }
-
-  override def hasNext: Boolean = {
-    iterator.hasNext
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
index 48efec8..f47babc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.common.logging.impl.StandardLogService;
@@ -193,13 +194,13 @@ public class CsvInput extends BaseStep implements StepInterface {
   }
 
   class RddScanCallable implements Callable<Void> {
-    List<JavaRddIterator<String[]>> iterList;
+    List<CarbonIterator<String[]>> iterList;
 
     RddScanCallable() {
-      this.iterList = new ArrayList<JavaRddIterator<String[]>>(1000);
+      this.iterList = new ArrayList<CarbonIterator<String[]>>(1000);
     }
 
-    public void addJavaRddIterator(JavaRddIterator<String[]> iter) {
+    public void addJavaRddIterator(CarbonIterator<String[]> iter) {
       this.iterList.add(iter);
     }
 
@@ -209,7 +210,7 @@ public class CsvInput extends BaseStep implements StepInterface {
           Thread.currentThread().getName());
       try {
         String[] values = null;
-        for (JavaRddIterator<String[]> iter: iterList) {
+        for (CarbonIterator<String[]> iter: iterList) {
           iter.initialize();
           while (iter.hasNext()) {
             values = iter.next();
@@ -227,7 +228,7 @@ public class CsvInput extends BaseStep implements StepInterface {
   };
 
   private void scanRddIterator(int numberOfNodes) throws RuntimeException {
-    JavaRddIterator<JavaRddIterator<String[]>> iter = RddInputUtils.getAndRemove(rddIteratorKey);
+    CarbonIterator<CarbonIterator<String[]>> iter = RddInputUtils.getAndRemove(rddIteratorKey);
     if (iter != null) {
       iter.initialize();
       exec = Executors.newFixedThreadPool(numberOfNodes);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
deleted file mode 100644
index 9e11816..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.csvreaderstep;
-/**
- * JavaRddIterator wrap spark rdd iterator.
- * It can avoid this module dependency spark module.
- * @param <E>
- */
-public interface JavaRddIterator<E> {
-
-  boolean hasNext();
-
-  E next();
-
-  void initialize();
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
index b3dfdab..8d87e16 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
@@ -22,16 +22,18 @@ package org.apache.carbondata.processing.csvreaderstep;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.carbondata.common.CarbonIterator;
+
 public class RddInputUtils {
-  private static Map<String, JavaRddIterator<JavaRddIterator<String[]>>>
iteratorMap = new
-      HashMap<String, JavaRddIterator<JavaRddIterator<String[]>>>();
+  private static Map<String, CarbonIterator<CarbonIterator<String[]>>>
iteratorMap = new
+      HashMap<String, CarbonIterator<CarbonIterator<String[]>>>();
 
-  public static void put(String key, JavaRddIterator<JavaRddIterator<String[]>>
value) {
+  public static void put(String key, CarbonIterator<CarbonIterator<String[]>>
value) {
     iteratorMap.put(key, value);
   }
 
-  public static JavaRddIterator<JavaRddIterator<String[]>> getAndRemove(String
key) {
-    JavaRddIterator<JavaRddIterator<String[]>> iter = iteratorMap.get(key);
+  public static CarbonIterator<CarbonIterator<String[]>> getAndRemove(String
key) {
+    CarbonIterator<CarbonIterator<String[]>> iter = iteratorMap.get(key);
     remove(key);
     return iter;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 9071dbe..8387bc0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -166,6 +166,11 @@ public class CarbonLoadModel implements Serializable {
   private int dictionaryServerPort;
 
   /**
+   * Pre fetch data from csv reader
+   */
+  private boolean preFetch;
+
+  /**
    * get escape char
    * @return
    */
@@ -381,6 +386,7 @@ public class CarbonLoadModel implements Serializable {
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
+    copy.preFetch = preFetch;
     return copy;
   }
 
@@ -430,6 +436,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.useOnePass = useOnePass;
     copyObj.dictionaryServerHost = dictionaryServerHost;
     copyObj.dictionaryServerPort = dictionaryServerPort;
+    copyObj.preFetch = preFetch;
     return copyObj;
   }
 
@@ -726,4 +733,12 @@ public class CarbonLoadModel implements Serializable {
   public void setDictionaryServerHost(String dictionaryServerHost) {
     this.dictionaryServerHost = dictionaryServerHost;
   }
+
+  public boolean isPreFetch() {
+    return preFetch;
+  }
+
+  public void setPreFetch(boolean preFetch) {
+    this.preFetch = preFetch;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 7450b1f..92e478e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -56,6 +56,8 @@ public class CarbonDataLoadConfiguration {
    */
   private int dictionaryServerPort;
 
+  private boolean preFetch;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -186,4 +188,12 @@ public class CarbonDataLoadConfiguration {
   public void setDictionaryServerPort(int dictionaryServerPort) {
     this.dictionaryServerPort = dictionaryServerPort;
   }
+
+  public boolean isPreFetch() {
+    return preFetch;
+  }
+
+  public void setPreFetch(boolean preFetch) {
+    this.preFetch = preFetch;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 2158219..5426227 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -166,6 +166,7 @@ public final class DataLoadProcessBuilder {
     configuration.setUseOnePass(loadModel.getUseOnePass());
     configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
     configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
+    configuration.setPreFetch(loadModel.isPreFetch());
 
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 036d15a..181b1e7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -54,8 +54,6 @@ public class ParallelReadMergeSorterImpl implements Sorter {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
 
-  private static final Object taskContext = CarbonDataProcessorUtil.fetchTaskContext();
-
   private SortParameters sortParameters;
 
   private SortIntermediateFileMerger intermediateFileMerger;
@@ -196,7 +194,6 @@ public class ParallelReadMergeSorterImpl implements Sorter {
     @Override
     public Void call() throws CarbonDataLoadingException {
       try {
-        CarbonDataProcessorUtil.configureTaskContext(taskContext);
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
           Iterator<CarbonRow> batchIterator = batch.getBatchIterator();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
index 23f100e..0e0fba4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -55,7 +55,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =
-          new InputProcessorIterator(readerIterators[i], rowParser, batchSize, executorService);
+          new InputProcessorIterator(readerIterators[i], rowParser, batchSize,
+              configuration.isPreFetch(), executorService);
     }
     return outIterators;
   }
@@ -115,17 +116,22 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
     private boolean nextBatch;
 
+    private boolean firstTime;
+
+    private boolean preFetch;
+
     public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators,
-        RowParser rowParser, int batchSize, ExecutorService executorService) {
+        RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService)
{
       this.inputIterators = inputIterators;
       this.batchSize = batchSize;
       this.rowParser = rowParser;
       this.counter = 0;
       // Get the first iterator from the list.
       currentIterator = inputIterators.get(counter++);
-      currentIterator.initialize();
       this.executorService = executorService;
+      this.preFetch = preFetch;
       this.nextBatch = false;
+      this.firstTime = true;
     }
 
     @Override
@@ -134,6 +140,10 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     }
 
     private boolean internalHasNext() {
+      if (firstTime) {
+        firstTime = false;
+        currentIterator.initialize();
+      }
       boolean hasNext = currentIterator.hasNext();
       // If iterator is finished then check for next iterator.
       if (!hasNext) {
@@ -151,6 +161,14 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
 
     @Override
     public CarbonRowBatch next() {
+      if (preFetch) {
+        return getCarbonRowBatchWithPreFetch();
+      } else {
+        return getBatch();
+      }
+    }
+
+    private CarbonRowBatch getCarbonRowBatchWithPreFetch() {
       CarbonRowBatch result = null;
       if (future == null) {
         future = getCarbonRowBatch();
@@ -174,17 +192,22 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     private Future<CarbonRowBatch> getCarbonRowBatch() {
       return executorService.submit(new Callable<CarbonRowBatch>() {
         @Override public CarbonRowBatch call() throws Exception {
-          // Create batch and fill it.
-          CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
-          int count = 0;
-          while (internalHasNext() && count < batchSize) {
-            carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
-            count++;
-          }
-          return carbonRowBatch;
+          return getBatch();
+
         }
       });
     }
+
+    private CarbonRowBatch getBatch() {
+      // Create batch and fill it.
+      CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
+      int count = 0;
+      while (internalHasNext() && count < batchSize) {
+        carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
+        count++;
+      }
+      return carbonRowBatch;
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 7231775..4495d00 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
@@ -82,7 +83,9 @@ public class SortDataRows {
 
     this.intermediateFileMerger = intermediateFileMerger;
 
-    this.sortBufferSize = parameters.getSortBufferSize();
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+
+    this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
   }
@@ -95,7 +98,7 @@ public class SortDataRows {
     // create holder list which will hold incoming rows
     // size of list will be sort buffer size + 1 to avoid creation of new
     // array in list array
-    this.recordHolderList = new Object[parameters.getSortBufferSize()][];
+    this.recordHolderList = new Object[sortBufferSize][];
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8100d949/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 5956792..2bb4fe6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -21,8 +21,6 @@ package org.apache.carbondata.processing.util;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -502,32 +500,4 @@ public final class CarbonDataProcessorUtil {
     return dateformatsHashMap;
   }
 
-  /**
-   * Maybe we can extract interfaces later to support task context in hive ,spark
-   */
-  public static Object fetchTaskContext() {
-    try {
-      return Class.forName("org.apache.spark.TaskContext").getMethod("get").invoke(null);
-    } catch (Exception e) {
-      //just ignore
-      LOGGER.info("org.apache.spark.TaskContext not found");
-      return null;
-    }
-  }
-
-  public static void configureTaskContext(Object context) {
-    try {
-      Class clazz = Class.forName("org.apache.spark.TaskContext$");
-      for (Method method : clazz.getDeclaredMethods()) {
-        if (method.getName().equals("setTaskContext")) {
-          Field field = clazz.getField("MODULE$");
-          Object instance = field.get(null);
-          method.invoke(instance, new Object[]{context});
-        }
-      }
-    } catch (Exception e) {
-      //just ignore
-      LOGGER.info("org.apache.spark.TaskContext not found");
-    }
-  }
 }
\ No newline at end of file


Mime
View raw message