Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 88518200BE2 for ; Thu, 1 Dec 2016 02:13:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 86ECE160B19; Thu, 1 Dec 2016 01:13:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AC0F9160B13 for ; Thu, 1 Dec 2016 02:13:25 +0100 (CET) Received: (qmail 6993 invoked by uid 500); 1 Dec 2016 01:13:24 -0000 Mailing-List: contact issues-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list issues@carbondata.incubator.apache.org Received: (qmail 6983 invoked by uid 99); 1 Dec 2016 01:13:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Dec 2016 01:13:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 5BD251A08BE for ; Thu, 1 Dec 2016 01:13:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -7.018 X-Spam-Level: X-Spam-Status: No, score=-7.018 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id xSiwp30twHZ9 for ; Thu, 1 Dec 2016 01:13:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 0F6E55F366 for ; Thu, 1 Dec 2016 01:13:20 +0000 (UTC) Received: (qmail 6958 invoked by uid 99); 1 Dec 2016 01:13:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Dec 2016 01:13:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0936E04BB; Thu, 1 Dec 2016 01:13:19 +0000 (UTC) From: jackylk To: issues@carbondata.incubator.apache.org Reply-To: issues@carbondata.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ... Content-Type: text/plain Message-Id: <20161201011319.C0936E04BB@git1-us-west.apache.org> Date: Thu, 1 Dec 2016 01:13:19 +0000 (UTC) archived-at: Thu, 01 Dec 2016 01:13:26 -0000 Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90365594 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala --- @@ -101,47 +101,107 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) - var confExecutorsTemp: String = null - if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") - } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { - confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") + ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) + } + + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util + .List[Distributable]], blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { + val nodesOfData = nodeMapping.size() + val confExecutors: Int = getConfiguredExecutors(sparkContext) + LOGGER.info("Executors configured : " + confExecutors) + val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { + confExecutors + } else if (confExecutors > nodesOfData) { + // this case will come only if dynamic allocation is true + var totalExecutorsToBeRequested = nodesOfData + // If total number of blocks are greater than the nodes identified then ensure + // that the configured number of max executors can be opened based on the difference of + // block list size and nodes identified + if (blockList.size > nodesOfData) { + // e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors + // need to be opened + // 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then + // total 4 executors need to be opened + val extraExecutorsToBeRequested = blockList.size - nodesOfData + if (extraExecutorsToBeRequested > confExecutors) { + totalExecutorsToBeRequested = confExecutors + } else { + totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested + } } + LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested) + totalExecutorsToBeRequested + } else { + nodesOfData } - val confExecutors = if (null != confExecutorsTemp) { - confExecutorsTemp.toInt + val startTime = System.currentTimeMillis(); + if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // this case will come only if dynamic allocation is true + CarbonContext + .ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty) } else { - 1 + CarbonContext.ensureExecutors(sparkContext, requiredExecutors) } - val requiredExecutors = if (nodeMapping.size > confExecutors) { - confExecutors + getDistinctNodesList(sparkContext, requiredExecutors, startTime) + } + + /** + * This method will return the configured executors + * + * @param sparkContext + * @return + */ + private def getConfiguredExecutors(sparkContext: SparkContext): Int = { + var confExecutors: Int = 0 + if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // default value for spark.dynamicAllocation.maxExecutors is infinity + confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1) } else { - nodeMapping.size() + // default value for spark.executor.instances is 2 + confExecutors = sparkContext.getConf.getInt("spark.executor.instances", 1) } + confExecutors + } - val startTime = System.currentTimeMillis() - CarbonContext.ensureExecutors(sparkContext, requiredExecutors) + /** + * This method will return the distinct nodes list + * + * @param sparkContext + * @param requiredExecutors + * @param startTime + * @return + */ + private def getDistinctNodesList(sparkContext: SparkContext, + requiredExecutors: Int, + startTime: Long): Seq[String] = { var nodes = DistributionUtil.getNodeList(sparkContext) - var maxTimes = 30 + var maxTimes = 10; --- End diff -- can we make it calculated based on configuration instead of hard coding it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---