carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [27/50] [abbrv] carbondata git commit: [CARBONDATA-2593] Add an option 'carbon.insert.storage.level' to support configuring the storage level when insert into data with 'carbon.insert.persist.enable'='true'
Date Fri, 22 Jun 2018 01:34:28 GMT
[CARBONDATA-2593] Add an option 'carbon.insert.storage.level' to support configuring the storage
level when insert into data with 'carbon.insert.persist.enable'='true'

When insert into data with 'carbon.insert.persist.enable'='true', the storage level of dataset
is 'MEMORY_AND_DISK',
it should support configuring the storage level to correspond to different environment.

This closes #2373


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/181f0ac9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/181f0ac9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/181f0ac9

Branch: refs/heads/carbonstore
Commit: 181f0ac9bed6ff7d83268f6c058aee943b348ddc
Parents: f0c8834
Author: Zhang Zhichao <441586683@qq.com>
Authored: Thu Jun 14 14:48:47 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Sat Jun 16 03:32:36 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 25 ++++++++++++++++++++
 .../carbondata/core/util/CarbonProperties.java  | 18 ++++++++++++++
 docs/configuration-parameters.md                |  4 ++++
 .../management/CarbonInsertIntoCommand.scala    |  5 ++--
 4 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c7281dd..19ff494 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -866,6 +866,7 @@ public final class CarbonCommonConstants {
    * to run load and insert queries on source table concurrently then user can enable this
flag
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_INSERT_PERSIST_ENABLED = "carbon.insert.persist.enable";
 
   /**
@@ -875,6 +876,27 @@ public final class CarbonCommonConstants {
   public static final String CARBON_INSERT_PERSIST_ENABLED_DEFAULT = "false";
 
   /**
+   * Which storage level to persist dataset when insert into data
+   * with 'carbon.insert.persist.enable'='true'
+   */
+  @CarbonProperty
+  @InterfaceStability.Evolving
+  public static final String CARBON_INSERT_STORAGE_LEVEL =
+      "carbon.insert.storage.level";
+
+  /**
+   * The default value(MEMORY_AND_DISK) is the same as the default storage level of Dataset.
+   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
+   * recomputing the in-memory columnar representation of the underlying table is expensive.
+   *
+   * if user's executor has less memory, set the CARBON_INSERT_STORAGE_LEVEL
+   * to MEMORY_AND_DISK_SER or other storage level to correspond to different environment.
+   * You can get more recommendations about storage level in spark website:
+   * http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence.
+   */
+  public static final String CARBON_INSERT_STORAGE_LEVEL_DEFAULT = "MEMORY_AND_DISK";
+
+  /**
    * default name of data base
    */
   public static final String DATABASE_DEFAULT_NAME = "default";
@@ -1094,6 +1116,7 @@ public final class CarbonCommonConstants {
    * to determine to use the rdd persist or not.
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String isPersistEnabled = "carbon.update.persist.enable";
 
   /**
@@ -1117,6 +1140,7 @@ public final class CarbonCommonConstants {
    * with 'carbon.update.persist.enable'='true'
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_UPDATE_STORAGE_LEVEL =
       "carbon.update.storage.level";
 
@@ -1354,6 +1378,7 @@ public final class CarbonCommonConstants {
    * Which storage level to persist rdd when sort_scope=global_sort
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL =
       "carbon.global.sort.rdd.storage.level";
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 6eb7de6..b134a7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1581,4 +1581,22 @@ public final class CarbonProperties {
       return defaultValue;
     }
   }
+
+  /**
+   * Return valid storage level for CARBON_INSERT_STORAGE_LEVEL
+   * @return String
+   */
+  public String getInsertIntoDatasetStorageLevel() {
+    String storageLevel = getProperty(CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL,
+        CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT);
+    boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
+    if (!validateStorageLevel) {
+      LOGGER.warn("The " + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL
+          + " configuration value is invalid. It will use default storage level("
+          + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT
+          + ") to persist dataset.");
+      storageLevel = CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT;
+    }
+    return storageLevel.toUpperCase();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 11cc6ea..f81959e 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -55,7 +55,11 @@ This section provides the details of all the configurations required for
CarbonD
 | carbon.max.driver.lru.cache.size | -1 | Max LRU cache size upto which data will be loaded
at the driver side. This value is expressed in MB. Default value of -1 means there is no memory
limit for caching. Only integer values greater than 0 are accepted. |  |
 | carbon.max.executor.lru.cache.size | -1 | Max LRU cache size upto which data will be loaded
at the executor side. This value is expressed in MB. Default value of -1 means there is no
memory limit for caching. Only integer values greater than 0 are accepted. If this parameter
is not configured, then the carbon.max.driver.lru.cache.size value will be considered. | 
|
 | carbon.merge.sort.prefetch | true | Enable prefetch of data during merge sort while reading
data from sort temp files in data loading. |  |
+| carbon.insert.persist.enable | false | Enabling this parameter considers persistent data.
If we are executing insert into query from source table using select statement & loading
the same source table concurrently, when select happens on source table during the data load,
it gets new record for which dictionary is not generated, so there will be inconsistency.
To avoid this condition we can persist the dataframe into MEMORY_AND_DISK(default value) and
perform insert into operation. By default this value will be false because no need to persist
the dataframe in all cases. If user wants to run load and insert queries on source table concurrently
then user can enable this parameter. |  |
+| carbon.insert.storage.level | MEMORY_AND_DISK | Which storage level to persist dataframe
when 'carbon.insert.persist.enable'=true, if user's executor has less memory, set this parameter
to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See
detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |
 |
 | carbon.update.persist.enable | true | Enabling this parameter considers persistent data.
Enabling this will reduce the execution time of UPDATE operation. |  |
+| carbon.update.storage.level | MEMORY_AND_DISK | Which storage level to persist dataframe
when 'carbon.update.persist.enable'=true, if user's executor has less memory, set this parameter
to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See
detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |
 |
+| carbon.global.sort.rdd.storage.level | MEMORY_ONLY | Which storage level to persist rdd
when loading data with 'sort_scope'='global_sort', if user's executor has less memory, set
this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different
environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence).
|  |
 | carbon.load.global.sort.partitions | 0 | The Number of partitions to use when shuffling
data for sort. If user don't configurate or configurate it less than 1, it uses the number
of map tasks as reduce tasks. In general, we recommend 2-3 tasks per CPU core in your cluster.
 | carbon.options.bad.records.logger.enable | false | Whether to create logs with details
about bad records. | |
 | carbon.bad.records.action | FORCE | This property can have four types of actions for bad
records FORCE, REDIRECT, IGNORE and FAIL. If set to FORCE then it auto-corrects the data by
storing the bad records as NULL. If set to REDIRECT then bad records are written to the raw
CSV instead of being loaded. If set to IGNORE then bad records are neither loaded nor written
to the raw CSV. If set to FAIL then data loading fails if any bad records are found. | |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 702f954..6c74ad2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -53,8 +53,9 @@ case class CarbonInsertIntoCommand(
     val df =
       if (isPersistRequired) {
         LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child)
-          .persist(StorageLevel.MEMORY_AND_DISK)
+        Dataset.ofRows(sparkSession, child).persist(
+          StorageLevel.fromString(
+            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
       } else {
         Dataset.ofRows(sparkSession, child)
       }


Mime
View raw message