carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [32/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
Date Sun, 04 Mar 2018 12:24:52 GMT
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/111bb5c4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/111bb5c4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/111bb5c4

Branch: refs/heads/carbonstore-rebase5
Commit: 111bb5c41946d72604964658ec8562f7722dec14
Parents: ef81248
Author: xuchuanyin <xuchuanyin@hust.edu.cn>
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Sun Mar 4 19:54:03 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  10 +
 .../core/datastore/block/TableBlockInfo.java    |  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md               |   1 +
 .../CarbonIndexFileMergeTestCase.scala          |   4 -
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   4 +-
 .../spark/sql/hive/DistributionUtil.scala       |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  18 +-
 .../merger/NodeMultiBlockRelation.java          |  40 ++
 .../processing/util/CarbonLoaderUtil.java       | 494 ++++++++++++-------
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +++++
 12 files changed, 552 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
    */
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 10000000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, carbondata will
+   * consider block size first and make sure that all the nodes will process almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+      = "carbon.load.skewedDataOptimization.enabled";
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator<Distributable> DATA_SIZE_DESC_COMPARATOR =
+      new Comparator<Distributable>() {
+        @Override public int compare(Distributable o1, Distributable o2) {
+          long diff =
+              ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength();
+          return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+        }
+      };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
       String[] locations, long blockLength, ColumnarFormatVersion version,
       String[] deletedDeltaFilePath) {
@@ -434,4 +450,17 @@ public class TableBlockInfo implements Distributable, Serializable {
   public void setDataMapWriterPath(String dataMapWriterPath) {
     this.dataMapWriterPath = dataMapWriterPath;
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("TableBlockInfo{");
+    sb.append("filePath='").append(filePath).append('\'');
+    sb.append(", blockOffset=").append(blockOffset);
+    sb.append(", blockLength=").append(blockLength);
+    sb.append(", segmentId='").append(segmentId).append('\'');
+    sb.append(", blockletId='").append(blockletId).append('\'');
+    sb.append(", locations=").append(Arrays.toString(locations));
+    sb.append('}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 8b81c1e..6fd087b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1235,6 +1235,17 @@ public final class CarbonProperties {
       return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
     }
   }
+
+  /**
+   * whether optimization for skewed data is enabled
+   * @return true, if enabled; false for not enabled.
+   */
+  public boolean isLoadSkewedDataOptimizationEnabled() {
+    String skewedEnabled = getProperty(
+        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION,
+        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT);
+    return skewedEnabled.equalsIgnoreCase("true");
+  }
   /**
    * returns true if carbon property
    * @param key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md b/docs/useful-tips-on-carbondata.md
index 4d43003..ff339d0 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -169,5 +169,6 @@
   | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. |
   | carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data loading | Whether to use multiple YARN local directories during table data loading for disk load balance | After enabling 'carbon.use.local.dir', if this is set to true, CarbonData will use all YARN local directories during data load for disk load balance, that will improve the data load performance. Please enable this property when you encounter disk hotspot problem during data loading. |
   | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. |
+  | carbon.load.skewedDataOptimization.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable size based block allocation strategy for data loading. | When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data -- It's useful if the size of your input data files varies widely, say 1MB~1GB. |
 
   Note: If your CarbonData instance is provided only for query, you may specify the property 'spark.speculation=true' which is in conf directory of spark.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 7608318..aace3ea 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -85,7 +85,6 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter()
@@ -111,7 +110,6 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter()
@@ -141,7 +139,6 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
@@ -171,7 +168,6 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 2ce46ef..baf1627 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 06acbba..8ba2767 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -281,8 +281,10 @@ class NewCarbonDataLoadRDD[K, V](
         val format = new CSVInputFormat
 
         val split = theSplit.asInstanceOf[CarbonNodePartition]
+        val inputSize = split.blocksDetails.map(_.getBlockLength).sum * 0.1 * 10  / 1024 / 1024
         logInfo("Input split: " + split.serializableHadoopSplit)
-        logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
+        logInfo("The block count in this node: " + split.nodeBlocksDetail.length)
+        logInfo(f"The input data size in this node: $inputSize%.2fMB")
         CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
             split.serializableHadoopSplit, split.nodeBlocksDetail.length)
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 1958d61..a676dd8 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -128,7 +128,7 @@ object DistributionUtil {
    */
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
       sparkContext: SparkContext): Seq[String] = {
-    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
+    val nodeMapping = CarbonLoaderUtil.nodeBlockMapping(blockList.asJava)
     ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 349c436..1d65b00 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1029,10 +1029,16 @@ object CarbonDataRDDFactory {
     val startTime = System.currentTimeMillis
     val activeNodes = DistributionUtil
       .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
-    val nodeBlockMapping =
-      CarbonLoaderUtil
-        .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
-        .toSeq
+    val skewedDataOptimization = CarbonProperties.getInstance()
+      .isLoadSkewedDataOptimizationEnabled()
+    val blockAssignStrategy = if (skewedDataOptimization) {
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST
+    } else {
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST
+    }
+    LOGGER.info(s"Allocating block to nodes using strategy: $blockAssignStrategy")
+    val nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(blockList.toSeq.asJava, -1,
+      activeNodes.toList.asJava, blockAssignStrategy).asScala.toSeq
     val timeElapsed: Long = System.currentTimeMillis - startTime
     LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
     LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
@@ -1040,7 +1046,9 @@ object CarbonDataRDDFactory {
     var str = ""
     nodeBlockMapping.foreach { entry =>
       val tableBlock = entry._2
-      str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
+      val totalSize = tableBlock.asScala.map(_.asInstanceOf[TableBlockInfo].getBlockLength).sum
+      str = str + "#Node: " + entry._1 + ", no.of.blocks: " + tableBlock.size() +
+            f", totalsize.of.blocks: ${totalSize * 0.1 * 10 / 1024 /1024}%.2fMB"
       tableBlock.asScala.foreach(tableBlockInfo =>
         if (!tableBlockInfo.getLocations.exists(hostentry =>
           hostentry.equalsIgnoreCase(entry._1)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
index ec2ddaf..1bb5736 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
@@ -16,15 +16,41 @@
  */
 package org.apache.carbondata.processing.merger;
 
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 
 public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> {
 
   private final List<Distributable> blocks;
   private final String node;
 
+  /**
+   * comparator to sort by data size in descending order. This is used to assign big blocks to
+   * bigger nodes first.
+   */
+  public static final Comparator<NodeMultiBlockRelation> DATA_SIZE_DESC_COMPARATOR =
+      new Comparator<NodeMultiBlockRelation>() {
+        @Override
+        public int compare(NodeMultiBlockRelation o1, NodeMultiBlockRelation o2) {
+          long diff = o1.getTotalSizeOfBlocks() - o2.getTotalSizeOfBlocks();
+          return diff > 0 ? -1 : (diff < 0 ? 1 : 0);
+        }
+      };
+  /**
+   * comparator to sort by data size in ascending order. This is used to assign left over blocks to
+   * smaller nodes first.
+   */
+  public static final Comparator<NodeMultiBlockRelation> DATA_SIZE_ASC_COMPARATOR =
+      new Comparator<NodeMultiBlockRelation>() {
+        @Override
+        public int compare(NodeMultiBlockRelation o1, NodeMultiBlockRelation o2) {
+          long diff = o1.getTotalSizeOfBlocks() - o2.getTotalSizeOfBlocks();
+          return diff > 0 ? 1 : (diff < 0 ? -1 : 0);
+        }
+      };
   public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
     this.node = node;
     this.blocks = blocks;
@@ -39,6 +65,20 @@ public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation
     return node;
   }
 
+  /**
+   * get the total size of the blocks
+   * @return size in bytes
+   */
+  public long getTotalSizeOfBlocks() {
+    long totalSize = 0;
+    if (blocks.get(0) instanceof TableBlockInfo) {
+      for (Distributable block : blocks) {
+        totalSize += ((TableBlockInfo) block).getBlockLength();
+      }
+    }
+    return totalSize;
+  }
+
   @Override public int compareTo(NodeMultiBlockRelation obj) {
     return this.blocks.size() - obj.getBlocks().size();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 3302094..1f93ba1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -24,7 +24,16 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -36,6 +45,7 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -58,7 +68,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.merger.NodeBlockRelation;
 import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
@@ -73,6 +82,23 @@ public final class CarbonLoaderUtil {
   private CarbonLoaderUtil() {
   }
 
+  /**
+   * strategy for assign blocks to nodes/executors
+   */
+  public enum BlockAssignmentStrategy {
+    BLOCK_NUM_FIRST("Assign blocks to node base on number of blocks"),
+    BLOCK_SIZE_FIRST("Assign blocks to node base on data size of blocks");
+    private String name;
+    BlockAssignmentStrategy(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return this.getClass().getSimpleName() + ':' + this.name;
+    }
+  }
+
   public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
     String segmentPath = CarbonTablePath.getSegmentPath(
         loadModel.getTablePath(), currentLoad + "");
@@ -548,9 +574,9 @@ public final class CarbonLoaderUtil {
   public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
       List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
       List<String> activeNode) {
-
     Map<String, List<Distributable>> mapOfNodes =
-        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode,
+            BlockAssignmentStrategy.BLOCK_NUM_FIRST);
     int taskPerNode = parallelism / mapOfNodes.size();
     //assigning non zero value to noOfTasksPerNode
     int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
@@ -566,7 +592,8 @@ public final class CarbonLoaderUtil {
    */
   public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
       int noOfNodesInput) {
-    return nodeBlockMapping(blockInfos, noOfNodesInput, null);
+    return nodeBlockMapping(blockInfos, noOfNodesInput, null,
+        BlockAssignmentStrategy.BLOCK_NUM_FIRST);
   }
 
   /**
@@ -581,82 +608,59 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * the method returns the number of required executors
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> getRequiredExecutors(
-      List<Distributable> blockInfos) {
-    List<NodeBlockRelation> flattenedList =
-        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (Distributable blockInfo : blockInfos) {
-      try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
-      }
-    }
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
-    return nodeAndBlockMapping;
-  }
-
-  /**
    * This method will divide the blocks among the nodes as per the data locality
    *
-   * @param blockInfos
+   * @param blockInfos blocks
    * @param noOfNodesInput -1 if number of nodes has to be decided
    *                       based on block location information
-   * @return
+   * @param blockAssignmentStrategy strategy used to assign blocks
+   * @return a map that maps node to blocks
    */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
-      int noOfNodesInput, List<String> activeNodes) {
-
-    Map<String, List<Distributable>> nodeBlocksMap =
-        new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<NodeBlockRelation> flattenedList =
-        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    Set<Distributable> uniqueBlocks =
-        new HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    Set<String> nodes = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
+  public static Map<String, List<Distributable>> nodeBlockMapping(
+      List<Distributable> blockInfos, int noOfNodesInput, List<String> activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    ArrayList<NodeMultiBlockRelation> rtnNode2Blocks = new ArrayList<>();
+
+    Set<Distributable> uniqueBlocks = new HashSet<>(blockInfos);
+    ArrayList<NodeMultiBlockRelation> originNode2Blocks = createNode2BlocksMapping(blockInfos);
+    Set<String> nodes = new HashSet<>(originNode2Blocks.size());
+    for (NodeMultiBlockRelation relation : originNode2Blocks) {
+      nodes.add(relation.getNode());
+    }
 
     int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
     if (null != activeNodes) {
       noofNodes = activeNodes.size();
     }
-    int blocksPerNode = blockInfos.size() / noofNodes;
-    blocksPerNode = blocksPerNode <= 0 ? 1 : blocksPerNode;
 
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+    // calculate the average expected size for each node
+    long sizePerNode = 0;
+    if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
+      sizePerNode = blockInfos.size() / noofNodes;
+      sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
+    } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
+      long totalFileSize = 0;
+      for (Distributable blockInfo : uniqueBlocks) {
+        totalFileSize += ((TableBlockInfo) blockInfo).getBlockLength();
+      }
+      sizePerNode = totalFileSize / noofNodes;
+    }
 
-    // so now we have a map of node vs blocks. allocate the block as per the order
-    createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
+    // assign blocks to each node
+    assignBlocksByDataLocality(rtnNode2Blocks, sizePerNode, uniqueBlocks, originNode2Blocks,
+        activeNodes, blockAssignmentStrategy);
 
     // if any blocks remain then assign them to nodes in round robin.
-    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
+    assignLeftOverBlocks(rtnNode2Blocks, uniqueBlocks, sizePerNode, activeNodes,
+        blockAssignmentStrategy);
 
-    return nodeBlocksMap;
+    // convert
+    Map<String, List<Distributable>> rtnNodeBlocksMap =
+        new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (NodeMultiBlockRelation relation : rtnNode2Blocks) {
+      rtnNodeBlocksMap.put(relation.getNode(), relation.getBlocks());
+    }
+    return rtnNodeBlocksMap;
   }
 
   /**
@@ -731,92 +735,207 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * If any left over data blocks are present then assign those to nodes in round robin way.
-   *
-   * @param outputMap
-   * @param uniqueBlocks
+   * If any left over data blocks are present then assign those to nodes in round robin way. This
+   * will not obey the data locality.
    */
-  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
-      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
+  private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> outputMap,
+      Set<Distributable> leftOverBlocks, long expectedSizePerNode, List<String> activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
+    for (int idx = 0; idx < outputMap.size(); idx++) {
+      node2Idx.put(outputMap.get(idx).getNode(), idx);
+    }
 
+    // iterate all the nodes and try to allocate blocks to the nodes
     if (activeNodes != null) {
       for (String activeNode : activeNodes) {
-        List<Distributable> blockLst = outputMap.get(activeNode);
-        if (null == blockLst) {
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Second assignment iteration: assign for executor: " + activeNode);
+        }
+
+        Integer idx;
+        List<Distributable> blockLst;
+        if (node2Idx.containsKey(activeNode)) {
+          idx = node2Idx.get(activeNode);
+          blockLst = outputMap.get(idx).getBlocks();
+        } else {
+          idx = node2Idx.size();
           blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         }
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
-        if (blockLst.size() > 0) {
-          outputMap.put(activeNode, blockLst);
+        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
+
+        if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
+          outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
+          node2Idx.put(activeNode, idx);
         }
       }
     } else {
-      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-        List<Distributable> blockLst = entry.getValue();
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+      for (NodeMultiBlockRelation entry : outputMap) {
+        List<Distributable> blockLst = entry.getBlocks();
+        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
       }
-
     }
 
-    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-      Iterator<Distributable> blocks = uniqueBlocks.iterator();
-      if (blocks.hasNext()) {
-        Distributable block = blocks.next();
-        List<Distributable> blockLst = entry.getValue();
-        blockLst.add(block);
-        blocks.remove();
-      }
+    // if there is still blocks left, allocate them in round robin manner to each nodes
+    assignBlocksUseRoundRobin(outputMap, leftOverBlocks, blockAssignmentStrategy);
+  }
+
+  /**
+   * assign remaining blocks to nodes
+   *
+   * @param remainingBlocks blocks to be allocated
+   * @param expectedSizePerNode expected size for each node
+   * @param blockLst destination for the blocks to be allocated
+   * @param blockAssignmentStrategy block assignment stretegy
+   */
+  private static void populateBlocks(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    switch (blockAssignmentStrategy) {
+      case BLOCK_NUM_FIRST:
+        populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
+        break;
+      case BLOCK_SIZE_FIRST:
+        populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported block assignment strategy: " + blockAssignmentStrategy);
     }
   }
 
   /**
-   * The method populate the blockLst to be allocate to a specific node.
-   * @param uniqueBlocks
-   * @param noOfBlocksPerNode
-   * @param blockLst
+   * Taken N number of distributable blocks from {@param remainingBlocks} and add them to output
+   * {@param blockLst}. After added, the total number of {@param blockLst} is less
+   * than {@param expectedSizePerNode}.
    */
-  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
-      List<Distributable> blockLst) {
-    Iterator<Distributable> blocks = uniqueBlocks.iterator();
-    //if the node is already having the per block nodes then avoid assign the extra blocks
-    if (blockLst.size() == noOfBlocksPerNode) {
+  private static void populateBlocksByNum(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    // if the node is already having the per block nodes then avoid assign the extra blocks
+    if (blockLst.size() == expectedSizePerNode) {
       return;
     }
     while (blocks.hasNext()) {
       Distributable block = blocks.next();
       blockLst.add(block);
       blocks.remove();
-      if (blockLst.size() >= noOfBlocksPerNode) {
+      if (blockLst.size() >= expectedSizePerNode) {
         break;
       }
     }
   }
 
   /**
-   * To create the final output of the Node and Data blocks
-   *
-   * @param outputMap
-   * @param blocksPerNode
-   * @param uniqueBlocks
-   * @param nodeAndBlockMapping
-   * @param activeNodes
+   * Taken N number of distributable blocks from {@param remainingBlocks} and add them to output
+   * {@param blockLst}. After added, the total accumulated block size of {@param blockLst}
+   * is less than {@param expectedSizePerNode}.
    */
-  private static void createOutputMap(Map<String, List<Distributable>> outputMap, int blocksPerNode,
-      Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> nodeAndBlockMapping,
-      List<String> activeNodes) {
+  private static void populateBlocksBySize(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    //if the node is already having the avg node size then avoid assign the extra blocks
+    long fileSize = 0;
+    for (Distributable block : blockLst) {
+      fileSize += ((TableBlockInfo) block).getBlockLength();
+    }
+    if (fileSize >= expectedSizePerNode) {
+      LOGGER.debug("Capacity is full, skip allocate blocks on this node");
+      return;
+    }
 
-    ArrayList<NodeMultiBlockRelation> multiBlockRelations =
-        new ArrayList<>(nodeAndBlockMapping.size());
-    for (Map.Entry<String, List<Distributable>> entry : nodeAndBlockMapping.entrySet()) {
-      multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), entry.getValue()));
+    while (blocks.hasNext()) {
+      Distributable block = blocks.next();
+      long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
+      if (fileSize < expectedSizePerNode) {
+        // `fileSize==0` means there are no blocks assigned to this node before
+        if (fileSize == 0 || fileSize + thisBlockSize <= expectedSizePerNode * 1.1D) {
+          blockLst.add(block);
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Second Assignment iteration: "
+                + ((TableBlockInfo) block).getFilePath() + "-"
+                + ((TableBlockInfo) block).getBlockLength() + "-->currentNode");
+          }
+          fileSize += thisBlockSize;
+          blocks.remove();
+        }
+      } else {
+        break;
+      }
     }
-    // sort nodes based on number of blocks per node, so that nodes having lesser blocks
-    // are assigned first
-    Collections.sort(multiBlockRelations);
+  }
 
-    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+  /**
+   * allocate the blocks in round robin manner
+   */
+  private static void assignBlocksUseRoundRobin(ArrayList<NodeMultiBlockRelation> node2Blocks,
+      Set<Distributable> remainingBlocks, BlockAssignmentStrategy blockAssignmentStrategy) {
+    switch (blockAssignmentStrategy) {
+      case BLOCK_NUM_FIRST:
+        roundRobinAssignBlocksByNum(node2Blocks, remainingBlocks);
+        break;
+      case BLOCK_SIZE_FIRST:
+        roundRobinAssignBlocksBySize(node2Blocks, remainingBlocks);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported block assignment strategy: "
+            + blockAssignmentStrategy);
+    }
+  }
+
+  private static void roundRobinAssignBlocksByNum(ArrayList<NodeMultiBlockRelation> outputMap,
+      Set<Distributable> remainingBlocks) {
+    for (NodeMultiBlockRelation relation: outputMap) {
+      Iterator<Distributable> blocks = remainingBlocks.iterator();
+      if (blocks.hasNext()) {
+        Distributable block = blocks.next();
+        List<Distributable> blockLst = relation.getBlocks();
+        blockLst.add(block);
+        blocks.remove();
+      }
+    }
+  }
+
+  private static void roundRobinAssignBlocksBySize(ArrayList<NodeMultiBlockRelation> outputMap,
+      Set<Distributable> remainingBlocks) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    while (blocks.hasNext()) {
+      // sort the allocated node-2-blocks in ascending order, the total data size of first one is
+      // the smallest, so we assign this block to it.
+      Collections.sort(outputMap, NodeMultiBlockRelation.DATA_SIZE_ASC_COMPARATOR);
+      Distributable block = blocks.next();
+      List<Distributable> blockLst = outputMap.get(0).getBlocks();
+      blockLst.add(block);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("RoundRobin assignment iteration: "
+            + ((TableBlockInfo) block).getFilePath() + "-"
+            + ((TableBlockInfo) block).getBlockLength() + "-->" + outputMap.get(0).getNode());
+      }
+      blocks.remove();
+    }
+  }
+  /**
+   * allocate distributable blocks to nodes based on data locality
+   */
+  private static void assignBlocksByDataLocality(
+      ArrayList<NodeMultiBlockRelation> outputNode2Blocks,
+      long expectedSizePerNode, Set<Distributable> remainingBlocks,
+      List<NodeMultiBlockRelation> inputNode2Blocks, List<String> activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
+      // sort nodes based on data size of all blocks per node, so that nodes having bigger size
+      // are assigned first
+      Collections.sort(inputNode2Blocks, NodeMultiBlockRelation.DATA_SIZE_DESC_COMPARATOR);
+    } else {
+      // sort nodes based on number of blocks per node, so that nodes having lesser blocks
+      // are assigned first
+      Collections.sort(inputNode2Blocks);
+    }
+
+    Map<String, Integer> executor2Idx = new HashMap<>();
+    for (NodeMultiBlockRelation nodeMultiBlockRelation : inputNode2Blocks) {
       String nodeName = nodeMultiBlockRelation.getNode();
-      //assign the block to the node only if the node is active
+      // assign the block to the node only if the node is active
       String activeExecutor = nodeName;
       if (null != activeNodes) {
         activeExecutor = getActiveExecutor(activeNodes, nodeName);
@@ -824,29 +943,75 @@ public final class CarbonLoaderUtil {
           continue;
         }
       }
-      // this loop will be for each NODE
-      int nodeCapacity = 0;
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("First Assignment iteration: assign for executor: " + activeExecutor);
+      }
+
+      List<Distributable> blocksInThisNode = nodeMultiBlockRelation.getBlocks();
+      if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
+        // sort blocks based on block size, so that bigger blocks will be assigned first
+        Collections.sort(blocksInThisNode, TableBlockInfo.DATA_SIZE_DESC_COMPARATOR);
+      }
+
+      long nodeCapacity = 0;
       // loop thru blocks of each Node
       for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
+        if (!remainingBlocks.contains(block)) {
+          // this block has been added before
+          continue;
+        }
+        // this is the first time to add block to this node, initialize it
+        if (!executor2Idx.containsKey(activeExecutor)) {
+          Integer idx = executor2Idx.size();
+          outputNode2Blocks.add(idx, new NodeMultiBlockRelation(activeExecutor,
+              new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)));
+          executor2Idx.put(activeExecutor, idx);
+        }
 
-        // check if this is already assigned.
-        if (uniqueBlocks.contains(block)) {
-
-          if (null == outputMap.get(activeExecutor)) {
-            List<Distributable> list =
-                new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-            outputMap.put(activeExecutor, list);
-          }
-          // assign this block to this node if node has capacity left
-          if (nodeCapacity < blocksPerNode) {
-            List<Distributable> infos = outputMap.get(activeExecutor);
+        // assign this block to this node if node has capacity left
+        if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
+          if (nodeCapacity < expectedSizePerNode) {
+            Integer idx = executor2Idx.get(activeExecutor);
+            List<Distributable> infos = outputNode2Blocks.get(idx).getBlocks();
             infos.add(block);
             nodeCapacity++;
-            uniqueBlocks.remove(block);
+            if (LOGGER.isDebugEnabled()) {
+              LOGGER.debug(
+                  "First Assignment iteration: " + ((TableBlockInfo) block).getFilePath() + '-'
+                      + ((TableBlockInfo) block).getBlockLength() + "-->" + activeExecutor);
+            }
+            remainingBlocks.remove(block);
+          } else {
+            // No need to continue loop as node is full
+            break;
+          }
+        } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
+          long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
+          // `nodeCapacity == 0` means that there is a huge block that already exceed the
+          // `expectedSize` of the node, so we have to assign it to some node, otherwise it will
+          // be assigned in the last RoundRobin iteration.
+          if (nodeCapacity == 0 || nodeCapacity < expectedSizePerNode) {
+            if (nodeCapacity == 0 || nodeCapacity + thisBlockSize <= expectedSizePerNode * 1.05D) {
+              Integer idx = executor2Idx.get(activeExecutor);
+              List<Distributable> blocks = outputNode2Blocks.get(idx).getBlocks();
+              blocks.add(block);
+              nodeCapacity += thisBlockSize;
+              if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(
+                    "First Assignment iteration: " + ((TableBlockInfo) block).getFilePath() + '-'
+                        + ((TableBlockInfo) block).getBlockLength() + "-->" + activeExecutor);
+              }
+              remainingBlocks.remove(block);
+            }
+            // this block is too big for current node and there are still capacity left
+            // for small files, so continue to allocate block on this node in next iteration.
           } else {
             // No need to continue loop as node is full
             break;
           }
+        } else {
+          throw new IllegalArgumentException(
+              "Unsupported block assignment strategy: " + blockAssignmentStrategy);
         }
       }
     }
@@ -890,60 +1055,37 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * Create the Node and its related blocks Mapping and put in a Map
+   * Create node to blocks mapping
    *
-   * @param flattenedList
-   * @param nodeAndBlockMapping
+   * @param blockInfos input block info
    */
-  private static void createNodeVsBlockMapping(List<NodeBlockRelation> flattenedList,
-      Map<String, List<Distributable>> nodeAndBlockMapping) {
-    for (NodeBlockRelation nbr : flattenedList) {
-      String node = nbr.getNode();
-      List<Distributable> list;
-
-      if (null == nodeAndBlockMapping.get(node)) {
-        list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        list.add(nbr.getBlock());
-        nodeAndBlockMapping.put(node, list);
-      } else {
-        list = nodeAndBlockMapping.get(node);
-        list.add(nbr.getBlock());
-      }
-    }
-    /*for resolving performance issue, removed values() with entrySet () iterating the values and
-    sorting it.entrySet will give the logical view for hashMap and we dont query the map twice for
-    each key whereas values () iterate twice*/
-    Iterator<Map.Entry<String, List<Distributable>>> iterator =
-        nodeAndBlockMapping.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Collections.sort(iterator.next().getValue());
-    }
-  }
+  private static ArrayList<NodeMultiBlockRelation> createNode2BlocksMapping(
+      List<Distributable> blockInfos) {
+    Map<String, Integer> node2Idx = new HashMap<>();
+    ArrayList<NodeMultiBlockRelation> node2Blocks = new ArrayList<>();
 
-  /**
-   * Create the flat List i.e flattening of the Map.
-   *
-   * @param blockInfos
-   * @param flattenedList
-   * @param uniqueBlocks
-   */
-  private static void createFlattenedListFromMap(List<Distributable> blockInfos,
-      List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
-      Set<String> nodeList) {
     for (Distributable blockInfo : blockInfos) {
-      // put the blocks in the set
-      uniqueBlocks.add(blockInfo);
-
       try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-          nodeList.add(eachNode);
+        for (final String eachNode : blockInfo.getLocations()) {
+          if (node2Idx.containsKey(eachNode)) {
+            Integer idx = node2Idx.get(eachNode);
+            List<Distributable> blocks = node2Blocks.get(idx).getBlocks();
+            blocks.add(blockInfo);
+          } else {
+            // add blocks to this node for the first time
+            Integer idx = node2Idx.size();
+            List<Distributable> blocks = new ArrayList<>();
+            blocks.add(blockInfo);
+            node2Blocks.add(idx, new NodeMultiBlockRelation(eachNode, blocks));
+            node2Idx.put(eachNode, idx);
+          }
         }
       } catch (IOException e) {
         throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
       }
     }
+
+    return node2Blocks;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java b/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
new file mode 100644
index 0000000..9c66ada
--- /dev/null
+++ b/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonLoaderUtilTest {
+  private final static LogService LOGGER
+      = LogServiceFactory.getLogService(CarbonLoaderUtilTest.class.getName());
+
+  private List<Distributable> generateBlocks() {
+    List<Distributable> blockInfos = new ArrayList<>();
+    String filePath = "/fakepath";
+    String blockId = "1";
+
+    String[] locations = new String[] { "host2", "host3" };
+    ColumnarFormatVersion version = ColumnarFormatVersion.V1;
+
+    TableBlockInfo tableBlockInfo1 = new TableBlockInfo(filePath + "_a", 0,
+        blockId, locations, 30 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo1);
+
+    TableBlockInfo tableBlockInfo2 = new TableBlockInfo(filePath + "_b", 0,
+        blockId, locations, 40 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo2);
+
+    TableBlockInfo tableBlockInfo3 = new TableBlockInfo(filePath + "_c", 0,
+        blockId, locations, 20 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo3);
+
+    TableBlockInfo tableBlockInfo4 = new TableBlockInfo(filePath + "_d", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo4);
+
+    TableBlockInfo tableBlockInfo5 = new TableBlockInfo(filePath + "_e", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo5);
+
+    TableBlockInfo tableBlockInfo6 = new TableBlockInfo(filePath + "_f", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo6);
+
+    TableBlockInfo tableBlockInfo7 = new TableBlockInfo(filePath + "_g", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo7);
+    return blockInfos;
+  }
+
+  private List<String> generateExecutors() {
+    List<String> activeNodes = new ArrayList<>();
+    activeNodes.add("host1");
+    activeNodes.add("host2");
+    activeNodes.add("host3");
+    return activeNodes;
+  }
+
+  @Test
+  public void testNodeBlockMappingByDataSize() throws Exception {
+    List<Distributable> blockInfos = generateBlocks();
+    List<String> activeNodes = generateExecutors();
+
+    // the blocks are assigned by size, so the number of block for each node are different
+    Map<String, List<Distributable>> nodeMappingBySize =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST);
+    LOGGER.info(convertMapListAsString(nodeMappingBySize));
+    Assert.assertEquals(3, nodeMappingBySize.size());
+    for (Map.Entry<String, List<Distributable>> entry : nodeMappingBySize.entrySet()) {
+      if (entry.getValue().size() == 1) {
+        // only contains the biggest block
+        Assert.assertEquals(40 * 1024 * 1024L,
+            ((TableBlockInfo) entry.getValue().get(0)).getBlockLength());
+      } else {
+        Assert.assertTrue(entry.getValue().size() > 1);
+      }
+    }
+
+    // the blocks are assigned by number, so the number of blocks for each node are nearly the same
+    Map<String, List<Distributable>> nodeMappingByNum =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
+    LOGGER.info(convertMapListAsString(nodeMappingByNum));
+    Assert.assertEquals(3, nodeMappingBySize.size());
+    for (Map.Entry<String, List<Distributable>> entry : nodeMappingByNum.entrySet()) {
+      Assert.assertTrue(entry.getValue().size() == blockInfos.size() / 3
+          || entry.getValue().size() == blockInfos.size() / 3 + 1);
+    }
+  }
+
+  private <K, T> String convertMapListAsString(Map<K, List<T>> mapList) {
+    StringBuffer sb = new StringBuffer();
+    for (Map.Entry<K, List<T>> entry : mapList.entrySet()) {
+      String key = entry.getKey().toString();
+      String value = StringUtils.join(entry.getValue(), ", ");
+      sb.append(key).append(" -- ").append(value).append(System.lineSeparator());
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file


Mime
View raw message