carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [25/50] [abbrv] incubator-carbondata git commit: [CARBONDATA-58] data loading is launched with wrong number of task (#820)
Date Wed, 20 Jul 2016 10:13:53 GMT
[CARBONDATA-58] data loading is launched with wrong number of task (#820)

Fixed the case, when number of data load job executors is less than hdfs nodes. Now data load
ensures all required executors are launched and distributes one task on each active node.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/00ff2146
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/00ff2146
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/00ff2146

Branch: refs/heads/master
Commit: 00ff2146ddd1b9738621531d6957fc7c309d5f88
Parents: b466c10
Author: Mohammad Shahid Khan <mohdshahidkhan1987@gmail.com>
Authored: Mon Jul 18 09:49:47 2016 +0530
Committer: Venkata Ramana G <g.ramana.v1@gmail.com>
Committed: Mon Jul 18 09:49:47 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java |  2 +-
 .../spark/sql/hive/DistributionUtil.scala       | 22 +++++++++++++-------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 10 ++++++++-
 .../carbondata/spark/rdd/CarbonQueryRDD.scala   | 13 +-----------
 4 files changed, 26 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/00ff2146/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index 8bb8598..3d17141 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -1066,7 +1066,7 @@ public final class CarbonLoaderUtil {
    * @return
    */
   public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable>
blockInfos,
-      int noOfNodesInput, List activeNodes) {
+      int noOfNodesInput, List<String> activeNodes) {
 
     Map<String, List<Distributable>> nodeBlocksMap =
         new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/00ff2146/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index baff2c7..52d76b3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -18,7 +18,6 @@ package org.apache.spark.sql.hive
 
 
 import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
-import java.util
 
 import scala.collection.JavaConverters._
 
@@ -27,8 +26,7 @@ import org.apache.spark.sql.CarbonContext
 
 import org.carbondata.common.logging.LogServiceFactory
 import org.carbondata.core.carbon.datastore.block.Distributable
-
-
+import org.carbondata.spark.load.CarbonLoaderUtil
 
 /**
  *
@@ -108,15 +106,25 @@ object DistributionUtil {
    * Checking if the existing executors is greater than configured executors, if yes
    * returning configured executors.
    *
-   * @param nodeMapping
-   * @param confExecutorsTemp
+   * @param blockList
    * @param sparkContext
    * @return
    */
-  def ensureExecutorsAndGetNodeList(nodeMapping: util.Map[String, util.List[Distributable]],
-    confExecutorsTemp: String,
+  def ensureExecutorsAndGetNodeList(blockList: Array[Distributable],
     sparkContext: SparkContext):
   Array[String] = {
+    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+    var confExecutorsTemp: String = null
+    if (sparkContext.getConf.contains("spark.executor.instances")) {
+      confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
+    } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
+      && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+      .equalsIgnoreCase("true")) {
+      if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
+        confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+      }
+    }
+
     val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1
     val requiredExecutors = if (nodeMapping.size > confExecutors) {
       confExecutors

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/00ff2146/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 12149c5..7271ee0 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.spark.{Logging, Partition, SparkContext, SparkEnv}
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
 import org.carbondata.common.logging.LogServiceFactory
@@ -721,8 +722,15 @@ object CarbonDataRDDFactory extends Logging {
           }
           )
           // group blocks to nodes, tasks
+          val startTime = System.currentTimeMillis
+          val activeNodes = DistributionUtil
+            .ensureExecutorsAndGetNodeList(blockList, sc.sparkContext)
           val nodeBlockMapping =
-            CarbonLoaderUtil.nodeBlockMapping(blockList.toSeq.asJava, -1).asScala.toSeq
+            CarbonLoaderUtil
+              .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+              .toSeq
+          val timeElapsed: Long = System.currentTimeMillis - startTime
+          logInfo("Total Time taken in block allocation : " + timeElapsed)
           logInfo("Total no of blocks : " + blockList.size
             + ", No.of Nodes : " + nodeBlockMapping.size
           )

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/00ff2146/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
index 9d81f0f..d65a54b 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
@@ -108,20 +108,9 @@ class CarbonQueryRDD[V: ClassTag](
       )
       if (blockList.nonEmpty) {
         // group blocks to nodes, tasks
-        val requiredExecutors = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
-        var confExecutors : String = null
-        if (sparkContext.getConf.contains("spark.executor.instances")) {
-          confExecutors = sparkContext.getConf.get("spark.executor.instances")
-        } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-          && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
-          .equalsIgnoreCase("true")) {
-          if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-            confExecutors = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
-          }
-        }
         val startTime = System.currentTimeMillis
         val activeNodes = DistributionUtil
-          .ensureExecutorsAndGetNodeList(requiredExecutors, confExecutors, sparkContext)
+          .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext)
         val nodeBlockMapping =
           CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
             activeNodes.toList.asJava


Mime
View raw message