carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-3008] Optimize default value for multiple temp dir
Date Wed, 24 Oct 2018 02:59:02 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 8af737204 -> b21a6d49f


[CARBONDATA-3008] Optimize default value for multiple temp dir

The feature of supporting multiple temp dirs for data loading is
introduced about 1.5 year ago. This feature is to solve the single disk
hot spot problem. After one year's verification in real production
environment, the feature turns out to be effective and correct. So in
this commit, we change the default behavior of this feature -- change it
from disable to enable by default.

Moreover, we remove the parameter 'carbon.use.multiple.temp.dir' and
only keep the parameter 'carbon.use.local.dir' and enable it by default.
If the cluster is not configured with yarn-local-dirs, the java temp dir
will be used.

This closes #2824


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b21a6d49
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b21a6d49
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b21a6d49

Branch: refs/heads/master
Commit: b21a6d49f8a7d99a6bbe804949d22cc6b3320de4
Parents: 8af7372
Author: xuchuanyin <xuchuanyin@hust.edu.cn>
Authored: Thu Oct 18 17:50:57 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Oct 24 10:58:33 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  9 ++-
 .../carbondata/core/util/CarbonProperties.java  | 17 -----
 docs/configuration-parameters.md                |  3 +-
 docs/performance-tuning.md                      |  1 -
 docs/usecases.md                                |  2 -
 .../TestLoadDataWithYarnLocalDirs.scala         |  9 ++-
 .../load/DataLoadProcessorStepOnSpark.scala     | 30 +--------
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  | 41 ++----------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 30 +--------
 .../carbondata/spark/util/CommonUtil.scala      | 66 +++++++++++---------
 .../datasources/SparkCarbonTableFormat.scala    | 29 +--------
 11 files changed, 60 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1b1046a..b5e1e5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1371,16 +1371,15 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SECURE_DICTIONARY_SERVER_DEFAULT = "true";
 
   /**
-   * whether to use multi directories when loading data,
-   * the main purpose is to avoid single-disk-hot-spot
+   * for loading, whether to use yarn's local dir the main purpose is to avoid single disk
hot spot
    */
   @CarbonProperty
-  public static final String CARBON_USE_MULTI_TEMP_DIR = "carbon.use.multiple.temp.dir";
+  public static final String CARBON_LOADING_USE_YARN_LOCAL_DIR = "carbon.use.local.dir";
 
   /**
-   * default value for multi temp dir
+   * default value for whether to enable carbon use yarn local dir
    */
-  public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false";
+  public static final String CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT = "true";
 
   /**
    * name of compressor to compress sort temp files

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a32ad52..e6440b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1135,23 +1135,6 @@ public final class CarbonProperties {
   }
 
   /**
-   * Returns whether to use multi temp dirs
-   * @return boolean
-   */
-  public boolean isUseMultiTempDir() {
-    String usingMultiDirStr = getProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR,
-        CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT);
-    boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr);
-    if (!validateBoolean) {
-      LOGGER.warn("The carbon.use.multiple.temp.dir configuration value is invalid."
-          + "Configured value: \"" + usingMultiDirStr + "\"."
-          + "Data Load will not use multiple temp directories.");
-      usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT;
-    }
-    return usingMultiDirStr.equalsIgnoreCase("true");
-  }
-
-  /**
    * Return valid storage level
    * @return String
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index ac204b1..7a6dcab 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -83,8 +83,9 @@ This section provides the details of all the configurations required for
the Car
 | carbon.enable.calculate.size | true | **For Load Operation**: Enabling this property will
let carbondata calculate the size of the carbon data file (.carbondata) and the carbon index
file (.carbonindex) for each load and update the table status file. **For Describe Formatted**:
Enabling this property will let carbondata calculate the total size of the carbon data files
and the carbon index files for the each table and display it in describe formatted command.
**NOTE:** This is useful to determine the overall size of the carbondata table and also get
an idea of how the table is growing in order to take up other backup strategy decisions. |
 | carbon.cutOffTimestamp | (none) | CarbonData has capability to generate the Dictionary
values for the timestamp columns from the data itself without the need to store the computed
dictionary values. This configuration sets the start date for calculating the timestamp. Java
counts the number of milliseconds from start of "1970-01-01 00:00:00". This property is used
to customize the start of position. For example "2000-01-01 00:00:00". **NOTE:** The date
must be in the form ***carbon.timestamp.format***. CarbonData supports storing data for upto
68 years.For example, if the cut-off time is 1970-01-01 05:30:00, then data upto 2038-01-01
05:30:00 will be supported by CarbonData. |
 | carbon.timegranularity | SECOND | The configuration is used to specify the data granularity
level such as DAY, HOUR, MINUTE, or SECOND. This helps to store more than 68 years of data
into CarbonData. |
-| carbon.use.local.dir | false | CarbonData,during data loading, writes files to local temp
directories before copying the files to HDFS. This configuration is used to specify whether
CarbonData can write locally to tmp directory of the container or to the YARN application
directory. |
+| carbon.use.local.dir | true | CarbonData,during data loading, writes files to local temp
directories before copying the files to HDFS. This configuration is used to specify whether
CarbonData can write locally to tmp directory of the container or to the YARN application
directory. |
 | carbon.use.multiple.temp.dir | false | When multiple disks are present in the system, YARN
is generally configured with multiple disks to be used as temp directories for managing the
containers. This configuration specifies whether to use multiple YARN local directories during
data loading for disk IO load balancing.Enable ***carbon.use.local.dir*** for this configuration
to take effect. **NOTE:** Data Loading is an IO intensive operation whose performance can
be limited by the disk IO threshold, particularly during multi table concurrent data load.Configuring
this parameter, balances the disk IO across multiple disks there by improving the over all
load performance. |
+| carbon.sort.temp.compressor | (none) | CarbonData writes every ***carbon.sort.size*** number
of records to intermediate temp files during data loading to ensure memory footprint is within
limits. These temporary files can be compressed and written in order to save the storage space.
This configuration specifies the name of compressor to be used to compress the intermediate
sort temp files during sort procedure in data loading. The valid values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD'
and empty. By default, empty means that Carbondata will not compress the sort temp files.
**NOTE:** Compressor will be useful if you encounter disk bottleneck.Since the data needs
to be compressed and decompressed,it involves additional CPU cycles,but is compensated by
the high IO throughput due to less data to be written or read from the disks. |
 | carbon.load.skewedDataOptimization.enabled | false | During data loading,CarbonData would
divide the number of blocks equally so as to ensure all executors process same number of blocks.
This mechanism satisfies most of the scenarios and ensures maximum parallel processing for
optimal data loading performance.In some business scenarios, there might be scenarios where
the size of blocks vary significantly and hence some executors would have to do more work
if they get blocks containing more data. This configuration enables size based block allocation
strategy for data loading. When loading, carbondata will use file size based block allocation
strategy for task distribution. It will make sure that all the executors process the same
size of data.**NOTE:** This configuration is useful if the size of your input data files varies
widely, say 1MB to 1GB.For this configuration to work effectively,knowing the data pattern
and size is important and necessary. |
 | carbon.load.min.size.enabled | false | During Data Loading, CarbonData would divide the
number of files among the available executors to parallelize the loading operation. When the
input data files are very small, this action causes to generate many small carbondata files.
This configuration determines whether to enable node minumun input data size allocation strategy
for data loading.It will make sure that the node load the minimum amount of data there by
reducing number of carbondata files.**NOTE:** This configuration is useful if the size of
the input data files are very small, like 1MB to 256MB. Refer to the load option ***load_min_size_inmb***
to configure the minimum size to be considered for splitting files among executors. |
 | enable.data.loading.statistics | false | CarbonData has extensive logging which would be
useful for debugging issues related to performance or hard to locate issues. This configuration
when made ***true*** would log additional data loading statistics information to more accurately
locate the issues being debugged. **NOTE:** Enabling this would log more debug information
to log files, there by increasing the log files size significantly in short span of time.It
is advised to configure the log files size, retention of log files parameters in log4j properties
appropriately. Also extensive logging is an increased IO operation and hence over all data
loading performance might get reduced. Therefore it is recommended to enable this configuration
only for the duration of debugging. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/performance-tuning.md
----------------------------------------------------------------------
diff --git a/docs/performance-tuning.md b/docs/performance-tuning.md
index 2b005af..64f80c4 100644
--- a/docs/performance-tuning.md
+++ b/docs/performance-tuning.md
@@ -170,7 +170,6 @@
 | spark.executor.instances/spark.executor.cores/spark.executor.memory | spark/conf/spark-defaults.conf
| Querying | The number of executors, CPU cores, and memory used for CarbonData query. | In
the bank scenario, we provide the 4 CPUs cores and 15 GB for each executor which can get good
performance. This 2 value does not mean more the better. It needs to be configured properly
in case of limited resources. For example, In the bank scenario, it has enough CPU 32 cores
each node but less memory 64 GB each node. So we cannot give more CPU but less memory. For
example, when 4 cores and 12GB for each executor. It sometimes happens GC during the query
which impact the query performance very much from the 3 second to more than 15 seconds. In
this scenario need to increase the memory or decrease the CPU cores. |
 | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Querying | The buffer size
to store records, returned from the block scan. | In limit scenario this parameter is very
important. For example your query limit is 1000. But if we set this value to 3000 that means
we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are
useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance
increase about 2 times in comparison to if we set this value to 12000. |
 | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN
local directories for multi-table load disk load balance | If this is set it to true CarbonData
will use YARN local directories for multi-table load disk load balance, that will improve
the data load performance. |
-| carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data loading | Whether
to use multiple YARN local directories during table data loading for disk load balance | After
enabling 'carbon.use.local.dir', if this is set to true, CarbonData will use all YARN local
directories during data load for disk load balance, that will improve the data load performance.
Please enable this property when you encounter disk hotspot problem during data loading. |
 | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify
the name of compressor to compress the intermediate sort temporary files during sort procedure
in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD', and empty.
By default, empty means that Carbondata will not compress the sort temp files. This parameter
will be useful if you encounter disk bottleneck. |
 | carbon.load.skewedDataOptimization.enabled | spark/carbonlib/carbon.properties | Data loading
| Whether to enable size based block allocation strategy for data loading. | When loading,
carbondata will use file size based block allocation strategy for task distribution. It will
make sure that all the executors process the same size of data -- It's useful if the size
of your input data files varies widely, say 1MB to 1GB. |
 | carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data loading | Whether
to enable node minumun input data size allocation strategy for data loading.| When loading,
carbondata will use node minumun input data size allocation strategy for task distribution.
It will make sure the nodes load the minimum amount of data -- It's useful if the size of
your input data files very small, say 1MB to 256MB,Avoid generating a large number of small
files. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/usecases.md
----------------------------------------------------------------------
diff --git a/docs/usecases.md b/docs/usecases.md
index e8b98b5..c029bb3 100644
--- a/docs/usecases.md
+++ b/docs/usecases.md
@@ -72,7 +72,6 @@ Apart from these, the following CarbonData configuration was suggested to
be con
 | Data Loading | table_blocksize                         | 256  | To efficiently schedule
multiple tasks during query |
 | Data Loading | carbon.sort.intermediate.files.limit    | 100    | Increased to 100 as number
of cores are more.Can perform merging in backgorund.If less number of files to merge, sort
threads would be idle |
 | Data Loading | carbon.use.local.dir                    | TRUE   | yarn application directory
will be usually on a single disk.YARN would be configured with multiple disks to be used as
temp or to assign randomly to applications. Using the yarn temp directory will allow carbon
to use multiple disks and improve IO performance |
-| Data Loading | carbon.use.multiple.temp.dir            | TRUE   | multiple disks to write
sort files will lead to better IO and reduce the IO bottleneck |
 | Compaction | carbon.compaction.level.threshold       | 6,6    | Since frequent small loads,
compacting more segments will give better query results |
 | Compaction | carbon.enable.auto.load.merge           | true   | Since data loading is small,auto
compacting keeps the number of segments less and also compaction can complete in  time |
 | Compaction | carbon.number.of.cores.while.compacting | 4      | Higher number of cores
can improve the compaction speed |
@@ -127,7 +126,6 @@ Use all columns are no-dictionary as the cardinality is high.
 | Data Loading | table_blocksize                         | 512                     | To efficiently
schedule multiple tasks during query. This size depends on data scenario.If data is such that
the filters would select less number of blocklets to scan, keeping higher number works well.If
the number blocklets to scan is more, better to reduce the size as more tasks can be scheduled
in parallel. |
 | Data Loading | carbon.sort.intermediate.files.limit    | 100                     | Increased
to 100 as number of cores are more.Can perform merging in backgorund.If less number of files
to merge, sort threads would be idle |
 | Data Loading | carbon.use.local.dir                    | TRUE                    | yarn
application directory will be usually on a single disk.YARN would be configured with multiple
disks to be used as temp or to assign randomly to applications. Using the yarn temp directory
will allow carbon to use multiple disks and improve IO performance |
-| Data Loading | carbon.use.multiple.temp.dir            | TRUE                    | multiple
disks to write sort files will lead to better IO and reduce the IO bottleneck |
 | Data Loading | sort.inmemory.size.in.mb                | 92160 | Memory allocated to do
inmemory sorting. When more memory is available in the node, configuring this will retain
more sort blocks in memory so that the merge sort is faster due to no/very less IO |
 | Compaction | carbon.major.compaction.size            | 921600                  | Sum of
several loads to combine into single segment |
 | Compaction | carbon.number.of.cores.while.compacting | 12                      | Higher
number of cores can improve the compaction speed.Data size is huge.Compaction need to use
more threads to speed up the process |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
index ff415ae..ef1bcbb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
@@ -65,15 +65,14 @@ class TestLoadDataWithYarnLocalDirs extends QueryTest with BeforeAndAfterAll
{
   }
 
   private def enableMultipleDir = {
-    CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "true")
     CarbonProperties.getInstance().addProperty(
-      CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR, "true")
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR, "true")
   }
 
   private def disableMultipleDir = {
-    CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "false")
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR,
-      CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT)
   }
 
   test("test carbon table data loading for multiple temp dir") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index f5c65b3..0a68fb0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.spark.load
 
-import scala.util.Random
-
 import com.univocity.parsers.common.TextParsingException
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
@@ -30,8 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider,
CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -43,7 +40,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
 import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+import org.apache.carbondata.spark.util.CommonUtil
 
 object DataLoadProcessorStepOnSpark {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -238,7 +235,7 @@ object DataLoadProcessorStepOnSpark {
     var dataWriter: DataWriterProcessorStepImpl = null
     try {
       model = modelBroadcast.value.getCopyWithTaskNo(index.toString)
-      val storeLocation = Array(getTempStoreLocation(index))
+      val storeLocation = CommonUtil.getTempStoreLocations(index.toString)
       val conf = DataLoadProcessBuilder.createConfiguration(model, storeLocation)
 
       tableName = model.getTableName
@@ -291,27 +288,6 @@ object DataLoadProcessorStepOnSpark {
     }
   }
 
-  private def getTempStoreLocation(index: Int): String = {
-    var storeLocation = ""
-    // this property is used to determine whether temp location for carbon is inside
-    // container temp dir or is yarn application directory.
-    val carbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false")
-    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-      val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.nonEmpty) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-    } else {
-      storeLocation = System.getProperty("java.io.tmpdir")
-    }
-    storeLocation = storeLocation + '/' + System.nanoTime() + '_' + index
-    storeLocation
-  }
-
   private def wrapException(e: Throwable, model: CarbonLoadModel): Unit = {
     e match {
       case e: CarbonDataLoadingException => throw e

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 86a5043..a03447d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -18,16 +18,14 @@
 package org.apache.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
-import scala.util.Random
 
-import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
@@ -42,7 +40,6 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     prev: RDD[Array[AnyRef]])
   extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) {
 
-  var storeLocation: String = null
   val carbonLoadModel = alterPartitionModel.carbonLoadModel
   val segmentId = alterPartitionModel.segmentId
   val oldPartitionIds = alterPartitionModel.oldPartitionIds
@@ -62,44 +59,18 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
     val iter = new Iterator[(K, V)] {
-      val partitionId = partitionInfo.getPartitionId(split.index)
+      val partitionId: Int = partitionInfo.getPartitionId(split.index)
       carbonLoadModel.setTaskNo(String.valueOf(partitionId))
       carbonLoadModel.setSegmentId(segmentId)
       CarbonMetadata.getInstance().addCarbonTable(
         carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
-      CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
-      val tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName,
-          segmentId,
-          carbonLoadModel.getTaskNo,
-          false,
-          true)
-      // this property is used to determine whether temp location for carbon is inside
-      // container temp dir or is yarn application directory.
-      val carbonUseLocalDir = CarbonProperties.getInstance()
-        .getProperty("carbon.use.local.dir", "false")
 
-      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
-        val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != storeLocations && storeLocations.nonEmpty) {
-          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-        }
-        if (storeLocation == null) {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-      } else {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-      storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
-      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
-
-      val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+      CommonUtil.setTempStoreLocation(split.index, carbonLoadModel,
+        isCompactionFlow = false, isAltPartitionFlow = true)
+      val tempStoreLoc: Array[String] = CarbonDataProcessorUtil.getLocalDataFolderLocation(
         databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true)
 
-      val loadStatus = if (rows.isEmpty) {
+      val loadStatus: Boolean = if (rows.isEmpty) {
         LOGGER.info("After repartition this split, NO target rows to write back.")
         true
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index fe09034..041dc1c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -17,13 +17,11 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.io._
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util.{Date, UUID}
 
 import scala.collection.mutable
-import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
@@ -89,35 +87,9 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
     CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
 
-    // this property is used to determine whether temp location for carbon is inside
-    // container temp dir or is yarn application directory.
-    val isCarbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
-
-    val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
-
-    if (isCarbonUseLocalDir) {
-      val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-
-      if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty)
{
-        // use single dir
-        storeLocation = storeLocation :+
-            (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
-        if (storeLocation == null || storeLocation.isEmpty) {
-          storeLocation = storeLocation :+
-              (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-        }
-      } else {
-        // use all the yarn dirs
-        storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
-      }
-    } else {
-      storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-    }
+    storeLocation = CommonUtil.getTempStoreLocations(splitIndex.toString)
     LOGGER.info("Temp location for loading data: " + storeLocation.mkString(","))
   }
-
-  private def tmpLocationSuffix = File.separator + "carbon" + System.nanoTime() + "_" + splitIndex
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 82a2f9d..7071295 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.util
 
 
+import java.io.File
 import java.text.SimpleDateFormat
 import java.util
 import java.util.regex.{Matcher, Pattern}
@@ -705,38 +706,47 @@ object CommonUtil {
       carbonLoadModel: CarbonLoadModel,
       isCompactionFlow: Boolean,
       isAltPartitionFlow: Boolean) : Unit = {
-    var storeLocation: String = null
-
-    // this property is used to determine whether temp location for carbon is inside
-    // container temp dir or is yarn application directory.
-    val carbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false")
-
-    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+    val storeLocation = getTempStoreLocations(index.toString).mkString(File.pathSeparator)
+
+    val tempLocationKey = CarbonDataProcessorUtil.getTempStoreLocationKey(
+      carbonLoadModel.getDatabaseName,
+      carbonLoadModel.getTableName,
+      carbonLoadModel.getSegmentId,
+      carbonLoadModel.getTaskNo,
+      isCompactionFlow,
+      isAltPartitionFlow)
+    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+  }
 
-      val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.nonEmpty) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
+  /**
+   * get the temp locations for each process thread
+   *
+   * @param index the id for each process thread
+   * @return an array of temp locations
+   */
+  def getTempStoreLocations(index: String) : Array[String] = {
+    var storeLocation: Array[String] = Array[String]()
+    val isCarbonUseYarnLocalDir = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT).equalsIgnoreCase("true")
+    val tmpLocationSuffix =
+      s"${File.separator}carbon${System.nanoTime()}${CarbonCommonConstants.UNDERSCORE}$index"
+    if (isCarbonUseYarnLocalDir) {
+      val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+
+      if (null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
+        storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
+      } else {
+        LOGGER.warn("It seems that the we didn't configure local dirs for yarn," +
+                    " so we are unable to use them for data loading." +
+                    " Here we will fall back using the java tmp dir.")
+        storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
       }
     } else {
-      storeLocation = System.getProperty("java.io.tmpdir")
-    }
-    storeLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR + "carbon" +
-      System.nanoTime() + CarbonCommonConstants.UNDERSCORE + index
-
-    val tempLocationKey = CarbonDataProcessorUtil
-      .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName,
-        carbonLoadModel.getSegmentId,
-        carbonLoadModel.getTaskNo,
-        isCompactionFlow,
-        isAltPartitionFlow)
-    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+      storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+    }
+    storeLocation
   }
-
   /**
    * This method will validate the cache level
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index b605a1d..6bbdcec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -53,7 +53,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWrit
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder,
LoadOption}
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 class SparkCarbonTableFormat
   extends FileFormat
@@ -172,33 +172,8 @@ with Serializable {
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
         val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
-        val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
-        var storeLocation: Array[String] = Array[String]()
-        val isCarbonUseLocalDir = CarbonProperties.getInstance()
-          .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
-
-
         val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
-        val tmpLocationSuffix =
-          File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber
-        if (isCarbonUseLocalDir) {
-          val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-          if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty)
{
-            // use single dir
-            storeLocation = storeLocation :+
-              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
-            if (storeLocation == null || storeLocation.isEmpty) {
-              storeLocation = storeLocation :+
-                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-            }
-          } else {
-            // use all the yarn dirs
-            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
-          }
-        } else {
-          storeLocation =
-            storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-        }
+        val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
         CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
         new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
       }


Mime
View raw message