From commits-return-11543-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Fri Jun 15 21:32:49 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1E4CE180636 for ; Fri, 15 Jun 2018 21:32:48 +0200 (CEST) Received: (qmail 47093 invoked by uid 500); 15 Jun 2018 19:32:48 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 47084 invoked by uid 99); 15 Jun 2018 19:32:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jun 2018 19:32:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1478CE1132; Fri, 15 Jun 2018 19:32:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Message-Id: <102d5d580d1544f491f3b84b72fc86c3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: carbondata git commit: [CARBONDATA-2593] Add an option 'carbon.insert.storage.level' to support configuring the storage level when insert into data with 'carbon.insert.persist.enable'='true' Date: Fri, 15 Jun 2018 19:32:48 +0000 (UTC) Repository: carbondata Updated Branches: refs/heads/master f0c88348a -> 181f0ac9b [CARBONDATA-2593] Add an option 'carbon.insert.storage.level' to support configuring the storage level when insert into data with 'carbon.insert.persist.enable'='true' When insert into data with 'carbon.insert.persist.enable'='true', the storage level of dataset is 'MEMORY_AND_DISK', it should support configuring the storage level to correspond to different environment. This closes #2373 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/181f0ac9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/181f0ac9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/181f0ac9 Branch: refs/heads/master Commit: 181f0ac9bed6ff7d83268f6c058aee943b348ddc Parents: f0c8834 Author: Zhang Zhichao <441586683@qq.com> Authored: Thu Jun 14 14:48:47 2018 +0800 Committer: Jacky Li Committed: Sat Jun 16 03:32:36 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 25 ++++++++++++++++++++ .../carbondata/core/util/CarbonProperties.java | 18 ++++++++++++++ docs/configuration-parameters.md | 4 ++++ .../management/CarbonInsertIntoCommand.scala | 5 ++-- 4 files changed, 50 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index c7281dd..19ff494 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -866,6 +866,7 @@ public final class CarbonCommonConstants { * to run load and insert queries on source table concurrently then user can enable this flag */ @CarbonProperty + @InterfaceStability.Evolving public static final String CARBON_INSERT_PERSIST_ENABLED = "carbon.insert.persist.enable"; /** @@ -875,6 +876,27 @@ public final class CarbonCommonConstants { public static final String CARBON_INSERT_PERSIST_ENABLED_DEFAULT = "false"; /** + * Which storage level to persist dataset when insert into data + * with 'carbon.insert.persist.enable'='true' + */ + @CarbonProperty + @InterfaceStability.Evolving + public static final String CARBON_INSERT_STORAGE_LEVEL = + "carbon.insert.storage.level"; + + /** + * The default value(MEMORY_AND_DISK) is the same as the default storage level of Dataset. + * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because + * recomputing the in-memory columnar representation of the underlying table is expensive. + * + * if user's executor has less memory, set the CARBON_INSERT_STORAGE_LEVEL + * to MEMORY_AND_DISK_SER or other storage level to correspond to different environment. + * You can get more recommendations about storage level in spark website: + * http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence. + */ + public static final String CARBON_INSERT_STORAGE_LEVEL_DEFAULT = "MEMORY_AND_DISK"; + + /** * default name of data base */ public static final String DATABASE_DEFAULT_NAME = "default"; @@ -1094,6 +1116,7 @@ public final class CarbonCommonConstants { * to determine to use the rdd persist or not. */ @CarbonProperty + @InterfaceStability.Evolving public static final String isPersistEnabled = "carbon.update.persist.enable"; /** @@ -1117,6 +1140,7 @@ public final class CarbonCommonConstants { * with 'carbon.update.persist.enable'='true' */ @CarbonProperty + @InterfaceStability.Evolving public static final String CARBON_UPDATE_STORAGE_LEVEL = "carbon.update.storage.level"; @@ -1354,6 +1378,7 @@ public final class CarbonCommonConstants { * Which storage level to persist rdd when sort_scope=global_sort */ @CarbonProperty + @InterfaceStability.Evolving public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL = "carbon.global.sort.rdd.storage.level"; http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 6eb7de6..b134a7c 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1581,4 +1581,22 @@ public final class CarbonProperties { return defaultValue; } } + + /** + * Return valid storage level for CARBON_INSERT_STORAGE_LEVEL + * @return String + */ + public String getInsertIntoDatasetStorageLevel() { + String storageLevel = getProperty(CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL, + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT); + boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel); + if (!validateStorageLevel) { + LOGGER.warn("The " + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL + + " configuration value is invalid. It will use default storage level(" + + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT + + ") to persist dataset."); + storageLevel = CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT; + } + return storageLevel.toUpperCase(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/docs/configuration-parameters.md ---------------------------------------------------------------------- diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index 11cc6ea..f81959e 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -55,7 +55,11 @@ This section provides the details of all the configurations required for CarbonD | carbon.max.driver.lru.cache.size | -1 | Max LRU cache size upto which data will be loaded at the driver side. This value is expressed in MB. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted. | | | carbon.max.executor.lru.cache.size | -1 | Max LRU cache size upto which data will be loaded at the executor side. This value is expressed in MB. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted. If this parameter is not configured, then the carbon.max.driver.lru.cache.size value will be considered. | | | carbon.merge.sort.prefetch | true | Enable prefetch of data during merge sort while reading data from sort temp files in data loading. | | +| carbon.insert.persist.enable | false | Enabling this parameter considers persistent data. If we are executing insert into query from source table using select statement & loading the same source table concurrently, when select happens on source table during the data load, it gets new record for which dictionary is not generated, so there will be inconsistency. To avoid this condition we can persist the dataframe into MEMORY_AND_DISK(default value) and perform insert into operation. By default this value will be false because no need to persist the dataframe in all cases. If user wants to run load and insert queries on source table concurrently then user can enable this parameter. | | +| carbon.insert.storage.level | MEMORY_AND_DISK | Which storage level to persist dataframe when 'carbon.insert.persist.enable'=true, if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). | | | carbon.update.persist.enable | true | Enabling this parameter considers persistent data. Enabling this will reduce the execution time of UPDATE operation. | | +| carbon.update.storage.level | MEMORY_AND_DISK | Which storage level to persist dataframe when 'carbon.update.persist.enable'=true, if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). | | +| carbon.global.sort.rdd.storage.level | MEMORY_ONLY | Which storage level to persist rdd when loading data with 'sort_scope'='global_sort', if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). | | | carbon.load.global.sort.partitions | 0 | The Number of partitions to use when shuffling data for sort. If user don't configurate or configurate it less than 1, it uses the number of map tasks as reduce tasks. In general, we recommend 2-3 tasks per CPU core in your cluster. | carbon.options.bad.records.logger.enable | false | Whether to create logs with details about bad records. | | | carbon.bad.records.action | FORCE | This property can have four types of actions for bad records FORCE, REDIRECT, IGNORE and FAIL. If set to FORCE then it auto-corrects the data by storing the bad records as NULL. If set to REDIRECT then bad records are written to the raw CSV instead of being loaded. If set to IGNORE then bad records are neither loaded nor written to the raw CSV. If set to FAIL then data loading fails if any bad records are found. | | http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala index 702f954..6c74ad2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala @@ -53,8 +53,9 @@ case class CarbonInsertIntoCommand( val df = if (isPersistRequired) { LOGGER.info("Persist enabled for Insert operation") - Dataset.ofRows(sparkSession, child) - .persist(StorageLevel.MEMORY_AND_DISK) + Dataset.ofRows(sparkSession, child).persist( + StorageLevel.fromString( + CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel)) } else { Dataset.ofRows(sparkSession, child) }