Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 41E97200CD2 for ; Thu, 27 Jul 2017 15:17:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4032816AC10; Thu, 27 Jul 2017 13:17:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E1A2416AC12 for ; Thu, 27 Jul 2017 15:17:10 +0200 (CEST) Received: (qmail 65147 invoked by uid 500); 27 Jul 2017 13:17:10 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 65136 invoked by uid 99); 27 Jul 2017 13:17:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jul 2017 13:17:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 78E9CE973B; Thu, 27 Jul 2017 13:17:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chenliang613@apache.org To: commits@carbondata.apache.org Date: Thu, 27 Jul 2017 13:17:10 -0000 Message-Id: <1451ac84f1b74ba19495b57a110fbb14@git.apache.org> In-Reply-To: <590ef5db67e84b70881f4353b2946956@git.apache.org> References: <590ef5db67e84b70881f4353b2946956@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] carbondata git commit: [CARBONDATA-1281] Support multiple temp dirs for writing files while loading data archived-at: Thu, 27 Jul 2017 13:17:13 -0000 [CARBONDATA-1281] Support multiple temp dirs for writing files while loading data This closes #1198 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ded8b416 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ded8b416 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ded8b416 Branch: refs/heads/master Commit: ded8b4162f1fc156355a183792d8077e9db794c6 Parents: 79feac9 Author: xuchuanyin Authored: Tue Jul 25 19:17:53 2017 +0800 Committer: chenliang613 Committed: Thu Jul 27 21:16:40 2017 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 12 +++ .../carbondata/core/util/CarbonMergerUtil.java | 16 +++ .../carbondata/core/util/CarbonProperties.java | 17 ++++ .../core/util/path/CarbonTablePath.java | 4 +- docs/useful-tips-on-carbondata.md | 1 + .../hadoop/test/util/StoreCreator.java | 2 +- .../TestLoadDataWithYarnLocalDirs.scala | 100 +++++++++++++++++++ .../org/apache/spark/util/SparkUtil4Test.scala | 33 ++++++ .../carbondata/spark/load/CarbonLoaderUtil.java | 25 +++-- .../load/DataLoadProcessorStepOnSpark.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 37 ++++--- .../merger/CompactionResultSortProcessor.java | 17 ++-- .../merger/RowResultMergerProcessor.java | 9 +- .../processing/newflow/DataLoadExecutor.java | 2 +- .../newflow/DataLoadProcessBuilder.java | 13 +-- .../sort/impl/ParallelReadMergeSorterImpl.java | 8 +- ...arallelReadMergeSorterWithBucketingImpl.java | 13 +-- .../UnsafeBatchParallelReadMergeSorterImpl.java | 7 +- .../impl/UnsafeParallelReadMergeSorterImpl.java | 11 +- ...arallelReadMergeSorterWithBucketingImpl.java | 13 +-- .../newflow/sort/unsafe/UnsafeSortDataRows.java | 12 ++- .../unsafe/merger/UnsafeIntermediateMerger.java | 7 +- .../UnsafeSingleThreadFinalSortFilesMerger.java | 33 +++--- .../CarbonRowDataWriterProcessorStepImpl.java | 12 +-- .../steps/DataWriterBatchProcessorStepImpl.java | 9 +- .../steps/DataWriterProcessorStepImpl.java | 12 +-- .../sortandgroupby/sortdata/SortDataRows.java | 17 ++-- .../sortandgroupby/sortdata/SortParameters.java | 27 +++-- .../store/CarbonFactDataHandlerColumnar.java | 3 +- .../store/CarbonFactDataHandlerModel.java | 10 +- .../store/SingleThreadFinalSortFilesMerger.java | 49 ++++++--- .../store/writer/AbstractFactDataWriter.java | 48 +++++++-- .../store/writer/CarbonDataWriterVo.java | 6 +- .../util/CarbonDataProcessorUtil.java | 80 +++++++++++---- .../carbondata/processing/StoreCreator.java | 3 +- 35 files changed, 484 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 8427a22..3acdba9 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1296,6 +1296,18 @@ public final class CarbonCommonConstants { public static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL = "carbon.lease.recovery.retry.interval"; + /** + * whether to use multi directories when loading data, + * the main purpose is to avoid single-disk-hot-spot + */ + @CarbonProperty + public static final String CARBON_USE_MULTI_TEMP_DIR = "carbon.use.multiple.temp.dir"; + + /** + * default value for multi temp dir + */ + public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java index 8df659a..c1f3f5e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMergerUtil.java @@ -46,4 +46,20 @@ public class CarbonMergerUtil { return localCardinality; } + /** + * read from the first non-empty level metadata + * @param paths paths + * @param tableName table name + * @return cardinality + */ + public static int[] getCardinalityFromLevelMetadata(String[] paths, String tableName) { + int[] localCardinality = null; + for (String path : paths) { + localCardinality = getCardinalityFromLevelMetadata(path, tableName); + if (null != localCardinality) { + break; + } + } + return localCardinality; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index d96d522..842543e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -851,6 +851,23 @@ public final class CarbonProperties { } /** + * Returns whether to use multi temp dirs + * @return boolean + */ + public boolean isUseMultiTempDir() { + String usingMultiDirStr = getProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR, + CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT); + boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr); + if (!validateBoolean) { + LOGGER.info("The carbon.use.multiple.temp.dir configuration value is invalid." + + "Configured value: \"" + usingMultiDirStr + "\"." + + "Data Load will not use multiple temp directories."); + usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT; + } + return usingMultiDirStr.equalsIgnoreCase("true"); + } + + /** * returns true if carbon property * @param key * @return http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 5824d76..7b8904c 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -385,7 +385,7 @@ public class CarbonTablePath extends Path { * @param factUpdateTimeStamp unique identifier to identify an update * @return gets data file name only with out path */ - public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber, + public static String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber, int batchNo, String factUpdateTimeStamp) { return DATA_PART_PREFIX + filePartNo + "-" + taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdateTimeStamp + CARBON_DATA_EXT; @@ -398,7 +398,7 @@ public class CarbonTablePath extends Path { * @param factUpdatedTimeStamp time stamp * @return filename */ - public String getCarbonIndexFileName(int taskNo, int bucketNumber, int batchNo, + public static String getCarbonIndexFileName(int taskNo, int bucketNumber, int batchNo, String factUpdatedTimeStamp) { return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT; http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/docs/useful-tips-on-carbondata.md ---------------------------------------------------------------------- diff --git a/docs/useful-tips-on-carbondata.md b/docs/useful-tips-on-carbondata.md index d5ced69..6c73b5e 100644 --- a/docs/useful-tips-on-carbondata.md +++ b/docs/useful-tips-on-carbondata.md @@ -231,5 +231,6 @@ scenarios. After the completion of POC, some of the configurations impacting the | spark.executor.instances/spark.executor.cores/spark.executor.memory | spark/conf/spark-defaults.conf | Querying | The number of executors, CPU cores, and memory used for CarbonData query. | In the bank scenario, we provide the 4 CPUs cores and 15 GB for each executor which can get good performance. This 2 value does not mean more the better. It needs to be configured properly in case of limited resources. For example, In the bank scenario, it has enough CPU 32 cores each node but less memory 64 GB each node. So we cannot give more CPU but less memory. For example, when 4 cores and 12GB for each executor. It sometimes happens GC during the query which impact the query performance very much from the 3 second to more than 15 seconds. In this scenario need to increase the memory or decrease the CPU cores. | | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Data loading | The buffer size to store records, returned from the block scan. | In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. | | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. | +| carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data loading | Whether to use multiple YARN local directories during table data loading for disk load balance | After enabling 'carbon.use.local.dir', if this is set to true, CarbonData will use all YARN local directories during data load for disk load balance, that will improve the data load performance. Please enable this property when you encounter disk hotspot problem during data loading. | Note: If your CarbonData instance is provided only for query, you may specify the property 'spark.speculation=true' which is in conf directory of spark. http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index 8cad313..cf44c6f 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -412,7 +412,7 @@ public class StoreCreator { CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext); new DataLoadExecutor().execute(loadModel, - storeLocation, + new String[] {storeLocation}, new CarbonIterator[]{readerIterator}); info.setDatabaseName(databaseName); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala new file mode 100644 index 0000000..e92a7fd --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.integration.spark.testsuite.dataload + +import java.io.File + +import org.apache.spark.SparkContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.util.SparkUtil4Test +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} + +/** + * Test Class for data loading using multiple temp yarn dirs. + * It would be better to test with massive data, + * since small amount of data will generate few (or none) temp files, + * which will not fully utilize all the temp dirs configured. + */ +class TestLoadDataWithYarnLocalDirs extends QueryTest with BeforeAndAfterAll { + override def beforeAll { + sql("drop table if exists carbontable_yarnLocalDirs") + sql("CREATE TABLE carbontable_yarnLocalDirs (id int, name string, city string, age int) " + + "STORED BY 'org.apache.carbondata.format'") + } + + private def getMockedYarnLocalDirs = { + val multi_dir_root = System.getProperty("java.io.tmpdir") + File.separator + + "yarn_local_multiple_dir" + File.separator + (1 to 3).map(multi_dir_root + "multiple" + _).mkString(",") + } + + private def initYarnLocalDir = { + //set all the possible env for yarn local dirs in case of various deploy environment + val sparkConf = SparkContext.getOrCreate().getConf + sparkConf.set("SPARK_EXECUTOR_DIRS", getMockedYarnLocalDirs) + sparkConf.set("SPARK_LOCAL_DIRS", getMockedYarnLocalDirs) + sparkConf.set("MESOS_DIRECTORY", getMockedYarnLocalDirs) + sparkConf.set("spark.local.dir", getMockedYarnLocalDirs) + sparkConf.set("LOCAL_DIRS", getMockedYarnLocalDirs) + + SparkUtil4Test.getOrCreateLocalRootDirs(sparkConf) + } + + private def cleanUpYarnLocalDir = { + initYarnLocalDir + .foreach(dir => CarbonUtil.deleteFoldersAndFiles(new File(dir))) + } + + private def enableMultipleDir = { + CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "true") + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR, "true") + } + + private def disableMultipleDir = { + CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR, + CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT) + } + + test("test carbon table data loading for multiple temp dir") { + initYarnLocalDir + + enableMultipleDir + + sql(s"LOAD DATA LOCAL INPATH '${resourcesPath}/sample.csv' INTO TABLE " + + "carbontable_yarnLocalDirs OPTIONS('DELIMITER'= ',')") + + disableMultipleDir + + checkAnswer(sql("select id from carbontable_yarnLocalDirs"), + Seq(Row(1), Row(2), Row(3), Row(3))) + + cleanUpYarnLocalDir + } + + override def afterAll { + sql("drop table if exists carbontable_yarnLocalDirs") + + cleanUpYarnLocalDir + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/integration/spark-common-test/src/test/scala/org/apache/spark/util/SparkUtil4Test.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/spark/util/SparkUtil4Test.scala b/integration/spark-common-test/src/test/scala/org/apache/spark/util/SparkUtil4Test.scala new file mode 100644 index 0000000..ef05264 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/spark/util/SparkUtil4Test.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.SparkConf + +/** + * This class is for accessing utils in spark package for tests + */ +object SparkUtil4Test { + def getConfiguredLocalDirs(conf: SparkConf): Array[String] = { + Utils.getConfiguredLocalDirs(conf) + } + + def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { + Utils.getOrCreateLocalRootDirs(conf) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index a9c50f7..6164286 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -76,6 +76,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import com.google.gson.Gson; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.util.Utils; @@ -215,8 +216,8 @@ public final class CarbonLoaderUtil { .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(), loadModel.getTaskNo(), isCompactionFlow); // form local store location - final String localStoreLocation = CarbonProperties.getInstance().getProperty(tempLocationKey); - if (localStoreLocation == null) { + final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey); + if (localStoreLocations == null) { throw new RuntimeException("Store location not set for the key " + tempLocationKey); } // submit local folder clean up in another thread so that main thread execution is not blocked @@ -224,16 +225,18 @@ public final class CarbonLoaderUtil { try { localFolderDeletionService.submit(new Callable() { @Override public Void call() throws Exception { - try { - long startTime = System.currentTimeMillis(); - File file = new File(localStoreLocation); - CarbonUtil.deleteFoldersAndFiles(file); - LOGGER.info( - "Deleted the local store location" + localStoreLocation + " : TIme taken: " + ( - System.currentTimeMillis() - startTime)); - } catch (IOException | InterruptedException e) { - LOGGER.error(e, "Failed to delete local data load folder location"); + long startTime = System.currentTimeMillis(); + String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator); + for (String loc : locArray) { + try { + CarbonUtil.deleteFoldersAndFiles(new File(loc)); + } catch (IOException | InterruptedException e) { + LOGGER.error(e, + "Failed to delete local data load folder location: " + loc); + } } + LOGGER.info("Deleted the local store location: " + localStoreLocations + + " : Time taken: " + (System.currentTimeMillis() - startTime)); return null; } }); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 47253b5..3f1637b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -154,7 +154,7 @@ object DataLoadProcessorStepOnSpark { try { model = modelBroadcast.value.getCopyWithTaskNo(index.toString) - val storeLocation = getTempStoreLocation(index) + val storeLocation = Array(getTempStoreLocation(index)) val conf = DataLoadProcessBuilder.createConfiguration(model, storeLocation) tableName = model.getTableName http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index e558218..b51800e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -17,7 +17,7 @@ package org.apache.carbondata.spark.rdd -import java.io.{IOException, ObjectInputStream, ObjectOutputStream} +import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream} import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util.{Date, UUID} @@ -123,7 +123,7 @@ class SparkPartitionLoader(model: CarbonLoadModel, loadMetadataDetails: LoadMetadataDetails) { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - var storeLocation: String = "" + var storeLocation: Array[String] = Array[String]() def initialize(): Unit = { val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) @@ -143,22 +143,33 @@ class SparkPartitionLoader(model: CarbonLoadModel, // this property is used to determine whether temp location for carbon is inside // container temp dir or is yarn application directory. - val carbonUseLocalDir = CarbonProperties.getInstance() - .getProperty("carbon.use.local.dir", "false") - if (carbonUseLocalDir.equalsIgnoreCase("true")) { - val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf) - if (null != storeLocations && storeLocations.nonEmpty) { - storeLocation = storeLocations(Random.nextInt(storeLocations.length)) - } - if (storeLocation == null) { - storeLocation = System.getProperty("java.io.tmpdir") + val isCarbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true") + + val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir + + if (isCarbonUseLocalDir) { + val yarnStoreLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf) + + if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) { + // use single dir + storeLocation = storeLocation :+ + (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix) + if (storeLocation == null || storeLocation.isEmpty) { + storeLocation = storeLocation :+ + (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) + } + } else { + // use all the yarn dirs + storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix) } } else { - storeLocation = System.getProperty("java.io.tmpdir") + storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) } - storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex + LOGGER.info("Temp location for loading data: " + storeLocation.mkString(",")) } + private def tmpLocationSuffix = File.separator + System.nanoTime() + File.separator + splitIndex } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index abd8824..56aaf54 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -100,7 +100,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { /** * temp store location to be sued during data load */ - private String tempStoreLocation; + private String[] tempStoreLocation; /** * table name */ @@ -178,10 +178,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { */ private void deleteTempStoreLocation() { if (null != tempStoreLocation) { - try { - CarbonUtil.deleteFoldersAndFiles(new File[] { new File(tempStoreLocation) }); - } catch (IOException | InterruptedException e) { - LOGGER.error("Problem deleting local folders during compaction: " + e.getMessage()); + for (String tempLoc : tempStoreLocation) { + try { + CarbonUtil.deleteFoldersAndFiles(new File(tempLoc)); + } catch (IOException | InterruptedException e) { + LOGGER.error("Problem deleting local folders during compaction: " + e.getMessage()); + } } } } @@ -358,8 +360,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { * sort temp files */ private void initializeFinalThreadMergerForMergeSort() { - String sortTempFileLocation = tempStoreLocation + CarbonCommonConstants.FILE_SEPARATOR - + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; boolean[] noDictionarySortColumnMapping = null; if (noDictionaryColMapping.length == this.segmentProperties.getNumberOfSortColumns()) { noDictionarySortColumnMapping = noDictionaryColMapping; @@ -368,6 +368,9 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { System.arraycopy(noDictionaryColMapping, 0, noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length); } + + String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation, + CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount, http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 7fdfbcc..ef53163 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.processing.merger; -import java.io.File; import java.util.AbstractQueue; import java.util.Comparator; import java.util.List; @@ -40,6 +39,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel; import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; import org.apache.carbondata.processing.store.CarbonFactHandler; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; /** * This is the Merger class responsible for the merging of the segments. @@ -57,12 +57,11 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { LogServiceFactory.getLogService(RowResultMergerProcessor.class.getName()); public RowResultMergerProcessor(String databaseName, - String tableName, SegmentProperties segProp, String tempStoreLocation, + String tableName, SegmentProperties segProp, String[] tempStoreLocation, CarbonLoadModel loadModel, CompactionType compactionType) { this.segprop = segProp; - if (!new File(tempStoreLocation).mkdirs()) { - LOGGER.error("Error while new File(tempStoreLocation).mkdirs() "); - } + CarbonDataProcessorUtil.createLocations(tempStoreLocation); + CarbonTable carbonTable = CarbonMetadata.getInstance() .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java index d4e79f8..20e4d97 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java @@ -37,7 +37,7 @@ public class DataLoadExecutor { private boolean isClosed; - public void execute(CarbonLoadModel loadModel, String storeLocation, + public void execute(CarbonLoadModel loadModel, String[] storeLocation, CarbonIterator[] inputIterators) throws Exception { try { loadProcessorStep = http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java index 9a0c193..518d64b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -47,6 +47,8 @@ import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; +import org.apache.commons.lang3.StringUtils; + /** * It builds the pipe line of steps for loading data to carbon. */ @@ -55,7 +57,7 @@ public final class DataLoadProcessBuilder { private static final LogService LOGGER = LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName()); - public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation, + public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] storeLocation, CarbonIterator[] inputIterators) throws Exception { CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); @@ -134,17 +136,16 @@ public final class DataLoadProcessBuilder { } public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel, - String storeLocation) { - if (!new File(storeLocation).mkdirs()) { - LOGGER.error("Error while creating the temp store path: " + storeLocation); - } + String[] storeLocation) { + CarbonDataProcessorUtil.createLocations(storeLocation); String databaseName = loadModel.getDatabaseName(); String tableName = loadModel.getTableName(); String tempLocationKey = CarbonDataProcessorUtil .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(), loadModel.getTaskNo(), false); - CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation); + CarbonProperties.getInstance().addProperty(tempLocationKey, + StringUtils.join(storeLocation, File.pathSeparator)); CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java index 849e549..1aa06f6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java @@ -68,16 +68,16 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { public void initialize(SortParameters sortParameters) { this.sortParameters = sortParameters; intermediateFileMerger = new SortIntermediateFileMerger(sortParameters); - String storeLocation = + String[] storeLocations = CarbonDataProcessorUtil.getLocalDataFolderLocation( sortParameters.getDatabaseName(), sortParameters.getTableName(), String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(), sortParameters.getSegmentId() + "", false); // Set the data file location - String dataFolderLocation = - storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; + String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations, + File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); finalMerger = - new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(), + new SingleThreadFinalSortFilesMerger(dataFolderLocations, sortParameters.getTableName(), sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java index 4522d8e..7314b1e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -134,13 +134,13 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte } private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) { - String storeLocation = CarbonDataProcessorUtil + String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), String.valueOf(sortParameters.getTaskNo()), bucketId, sortParameters.getSegmentId() + "", false); // Set the data file location - String dataFolderLocation = - storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; + String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator, + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(), sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(), @@ -185,12 +185,13 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte } private void setTempLocation(SortParameters parameters) { - String carbonDataDirectoryPath = CarbonDataProcessorUtil + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), false); - parameters.setTempFileLocation( - carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + parameters.setTempFileLocation(tmpLocs); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index a8d1eef..e140d86 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -221,12 +221,13 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter } private void setTempLocation(SortParameters parameters) { - String carbonDataDirectoryPath = CarbonDataProcessorUtil + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), batchCount + "", parameters.getSegmentId(), false); - parameters.setTempFileLocation( - carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, + File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + parameters.setTempFileLocation(tempDirs); } @Override public UnsafeSingleThreadFinalSortFilesMerger next() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java index 3d494de..ad4ebfc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.processing.newflow.sort.impl; -import java.io.File; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; @@ -28,7 +27,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.memory.MemoryException; @@ -43,7 +41,6 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeInterme import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; /** * It parallely reads data from array of iterates and do merge sort. @@ -70,13 +67,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter { @Override public void initialize(SortParameters sortParameters) { this.sortParameters = sortParameters; unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters); - String storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(), - sortParameters.getSegmentId() + "", false); - // Set the data file location - String dataFolderLocation = - storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; + finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, sortParameters.getTempFileLocation()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java index aed2bc1..e508654 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java @@ -124,13 +124,13 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter { } private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) { - String storeLocation = CarbonDataProcessorUtil + String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), String.valueOf(sortParameters.getTaskNo()), bucketId, sortParameters.getSegmentId() + "", false); // Set the data file location - String dataFolderLocation = - storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; + String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, + File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation); } @@ -168,11 +168,12 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter { } private void setTempLocation(SortParameters parameters) { - String carbonDataDirectoryPath = CarbonDataProcessorUtil + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), false); - parameters.setTempFileLocation( - carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + parameters.setTempFileLocation(tmpLoc); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java index 8021b45..fd0bec4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -136,9 +137,7 @@ public class UnsafeSortDataRows { deleteSortLocationIfExists(); // create new sort temp directory - if (!new File(parameters.getTempFileLocation()).mkdirs()) { - LOGGER.info("Sort Temp Location Already Exists"); - } + CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation()); this.dataSorterAndWriterExecutorService = Executors.newFixedThreadPool(parameters.getNumberOfCores()); semaphore = new Semaphore(parameters.getNumberOfCores()); @@ -365,9 +364,12 @@ public class UnsafeSortDataRows { } if (page.isSaveToDisk()) { // create a new file every time + // create a new file and pick a temp directory randomly every time + String tmpDir = parameters.getTempFileLocation()[ + new Random().nextInt(parameters.getTempFileLocation().length)]; File sortTempFile = new File( - parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System - .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + tmpDir + File.separator + parameters.getTableName() + + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeData(page, sortTempFile); LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize() + " and write is: " + (System.currentTimeMillis() - startTime)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java index d10c75f..93698ec 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java @@ -19,6 +19,7 @@ package org.apache.carbondata.processing.newflow.sort.unsafe.merger; import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -106,8 +107,12 @@ public class UnsafeIntermediateMerger { * @param intermediateFiles */ private void startIntermediateMerging(File[] intermediateFiles) { + //pick a temp location randomly + String[] tempFileLocations = parameters.getTempFileLocation(); + String targetLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)]; + File file = new File( - parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System + targetLocation + File.separator + parameters.getTableName() + System .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); UnsafeIntermediateFileMerger merger = new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index eb7af47..1455365 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -20,6 +20,8 @@ package org.apache.carbondata.processing.newflow.sort.unsafe.merger; import java.io.File; import java.io.FileFilter; import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.PriorityQueue; @@ -57,14 +59,14 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator merges) throws CarbonDataWriterException { try { - File[] filesToMergeSort = getFilesToMergeSort(); - this.fileCounter = rowPages.length + filesToMergeSort.length + merges.size(); + List filesToMergeSort = getFilesToMergeSort(); + this.fileCounter = rowPages.length + filesToMergeSort.size() + merges.size(); if (fileCounter == 0) { LOGGER.info("No files to merge sort"); return; @@ -145,20 +147,25 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator getFilesToMergeSort() { + FileFilter fileFilter = new FileFilter() { public boolean accept(File pathname) { return pathname.getName().startsWith(tableName); } - }); + }; - if (null == fileList || fileList.length < 0) { - return new File[0]; + // get all the merged files + List files = new ArrayList(tempFileLocation.length); + for (String tempLoc : tempFileLocation) + { + File[] subFiles = new File(tempLoc).listFiles(fileFilter); + if (null != subFiles && subFiles.length > 0) + { + files.addAll(Arrays.asList(subFiles)); + } } - return fileList; + + return files; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java index 4ed588d..fe9257f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.processing.newflow.steps; -import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.ExecutorService; @@ -88,12 +87,12 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces child.initialize(); } - private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { - String storeLocation = CarbonDataProcessorUtil + private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { + String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, configuration.getSegmentId() + "", false); - new File(storeLocation).mkdirs(); + CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } @@ -147,9 +146,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces } private void doExecute(Iterator iterator, int partitionId, int iteratorIndex) { - String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); + String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, iteratorIndex); + .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, + iteratorIndex); CarbonFactHandler dataHandler = null; boolean rowsNotExist = true; while (iterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java index 00cb302..c5f2479 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.processing.newflow.steps; -import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -59,12 +58,12 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS child.initialize(); } - private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { - String storeLocation = CarbonDataProcessorUtil + private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { + String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, configuration.getSegmentId() + "", false); - new File(storeLocation).mkdirs(); + CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } @@ -79,7 +78,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS System.currentTimeMillis()); int i = 0; for (Iterator iterator : iterators) { - String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i)); + String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i)); int k = 0; while (iterator.hasNext()) { CarbonRowBatch next = iterator.next(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java index afb3062..18155dd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java @@ -16,7 +16,6 @@ */ package org.apache.carbondata.processing.newflow.steps; -import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -66,19 +65,19 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { child.initialize(); } - private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { - String storeLocation = CarbonDataProcessorUtil + private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { + String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, configuration.getSegmentId() + "", false); - new File(storeLocation).mkdirs(); + CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) { CarbonTableIdentifier tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); - String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); + String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, 0); return model; @@ -95,7 +94,8 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { System.currentTimeMillis()); int i = 0; for (Iterator iterator : iterators) { - String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i)); + String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i)); + CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel .createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0); CarbonFactHandler dataHandler = null; http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java index 5df3112..fc575b6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java @@ -24,6 +24,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.math.BigDecimal; import java.util.Arrays; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -101,9 +102,7 @@ public class SortDataRows { deleteSortLocationIfExists(); // create new sort temp directory - if (!new File(parameters.getTempFileLocation()).mkdirs()) { - LOGGER.info("Sort Temp Location Already Exists"); - } + CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation()); this.dataSorterAndWriterExecutorService = Executors.newFixedThreadPool(parameters.getNumberOfCores()); semaphore = new Semaphore(parameters.getNumberOfCores()); @@ -204,9 +203,11 @@ public class SortDataRows { } recordHolderList = toSort; - // create new file + // create new file and choose folder randomly + String[] tmpLocation = parameters.getTempFileLocation(); + String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)]; File file = new File( - parameters.getTempFileLocation() + File.separator + parameters.getTableName() + + locationChosen + File.separator + parameters.getTableName() + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataTofile(recordHolderList, this.entryCount, file); @@ -406,9 +407,11 @@ public class SortDataRows { new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); } - // create a new file every time + // create a new file and choose folder randomly every time + String[] tmpFileLocation = parameters.getTempFileLocation(); + String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; File sortTempFile = new File( - parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System + locationChosen + File.separator + parameters.getTableName() + System .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataTofile(recordHolderArray, recordHolderArray.length, sortTempFile); // add sort temp filename to and arrayList. When the list size reaches 20 then http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java index 7a7142b..16cab07 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java @@ -30,6 +30,8 @@ import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.schema.metadata.SortObserver; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; +import org.apache.commons.lang3.StringUtils; + public class SortParameters implements Serializable { private static final LogService LOGGER = @@ -37,7 +39,7 @@ public class SortParameters implements Serializable { /** * tempFileLocation */ - private String tempFileLocation; + private String[] tempFileLocation; /** * sortBufferSize */ @@ -156,11 +158,11 @@ public class SortParameters implements Serializable { return parameters; } - public String getTempFileLocation() { + public String[] getTempFileLocation() { return tempFileLocation; } - public void setTempFileLocation(String tempFileLocation) { + public void setTempFileLocation(String[] tempFileLocation) { this.tempFileLocation = tempFileLocation; } @@ -407,13 +409,15 @@ public class SortParameters implements Serializable { LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize()); - String carbonDataDirectoryPath = CarbonDataProcessorUtil + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), configuration.getTaskNo(), configuration.getPartitionId(), configuration.getSegmentId(), false); - parameters.setTempFileLocation( - carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); - LOGGER.info("temp file location" + parameters.getTempFileLocation()); + String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, + File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + + parameters.setTempFileLocation(sortTempDirs); + LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ",")); int numberOfCores; try { @@ -528,12 +532,13 @@ public class SortParameters implements Serializable { LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize()); - String carbonDataDirectoryPath = CarbonDataProcessorUtil + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId, isCompactionFlow); - parameters.setTempFileLocation( - carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); - LOGGER.info("temp file location" + parameters.getTempFileLocation()); + String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, + File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + parameters.setTempFileLocation(sortTempDirs); + LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ",")); int numberOfCores; try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 429c5a3..3ed5888 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -304,7 +304,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ public void initialise() throws CarbonDataWriterException { fileManager = new FileManager(); - fileManager.setName(new File(model.getStoreLocation()).getName()); + // todo: the fileManager seems to be useless, remove it later + fileManager.setName(new File(model.getStoreLocation()[0]).getName()); setWritingConfiguration(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 18eeb2a..51ec84b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -77,7 +77,7 @@ public class CarbonFactDataHandlerModel { /** * local store location */ - private String storeLocation; + private String[] storeLocation; /** * flag to check whether use inverted index */ @@ -163,7 +163,7 @@ public class CarbonFactDataHandlerModel { * Create the model using @{@link CarbonDataLoadConfiguration} */ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel( - CarbonDataLoadConfiguration configuration, String storeLocation, int bucketId, + CarbonDataLoadConfiguration configuration, String[] storeLocation, int bucketId, int taskExtension) { CarbonTableIdentifier identifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); @@ -265,7 +265,7 @@ public class CarbonFactDataHandlerModel { */ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel, CarbonTable carbonTable, SegmentProperties segmentProperties, String tableName, - String tempStoreLocation) { + String[] tempStoreLocation) { CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel(); carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime()); carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName()); @@ -375,11 +375,11 @@ public class CarbonFactDataHandlerModel { this.measureCount = measureCount; } - public String getStoreLocation() { + public String[] getStoreLocation() { return storeLocation; } - public void setStoreLocation(String storeLocation) { + public void setStoreLocation(String[] storeLocation) { this.storeLocation = storeLocation; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java index 9b81979..6178cfb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java @@ -20,6 +20,9 @@ package org.apache.carbondata.processing.store; import java.io.File; import java.io.FileFilter; import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -92,7 +95,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator { /** * tempFileLocation */ - private String tempFileLocation; + private String[] tempFileLocation; private DataType[] measureDataType; @@ -104,7 +107,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator { private boolean[] isNoDictionarySortColumn; - public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName, + public SingleThreadFinalSortFilesMerger(String[] tempFileLocation, String tableName, int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount, DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) { this.tempFileLocation = tempFileLocation; @@ -124,19 +127,35 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator { * @throws CarbonSortKeyAndGroupByException */ public void startFinalMerge() throws CarbonDataWriterException { - // get all the merged files - File file = new File(tempFileLocation); + List filesToMerge = getFilesToMergeSort(); + if (filesToMerge.size() == 0) + { + LOGGER.info("No file to merge in final merge stage"); + return; + } + + startSorting(filesToMerge); + } - File[] fileList = file.listFiles(new FileFilter() { + private List getFilesToMergeSort() { + FileFilter fileFilter = new FileFilter() { public boolean accept(File pathname) { return pathname.getName().startsWith(tableName); } - }); + }; - if (null == fileList || fileList.length == 0) { - return; + // get all the merged files + List files = new ArrayList(tempFileLocation.length); + for (String tempLoc : tempFileLocation) + { + File[] subFiles = new File(tempLoc).listFiles(fileFilter); + if (null != subFiles && subFiles.length > 0) + { + files.addAll(Arrays.asList(subFiles)); + } } - startSorting(fileList); + + return files; } /** @@ -147,8 +166,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator { * * @throws CarbonSortKeyAndGroupByException */ - private void startSorting(File[] files) throws CarbonDataWriterException { - this.fileCounter = files.length; + private void startSorting(List files) throws CarbonDataWriterException { + this.fileCounter = files.size(); if (fileCounter == 0) { LOGGER.info("No files to merge sort"); return; @@ -162,7 +181,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator { LOGGER.info("File Buffer Size: " + this.fileBufferSize); // create record holder heap - createRecordHolderQueue(files); + createRecordHolderQueue(); // iterate over file list and create chunk holder and add to heap LOGGER.info("Started adding first record from each file"); @@ -215,12 +234,10 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator { /** * This method will be used to create the heap which will be used to hold * the chunk of data - * - * @param listFiles list of temp files */ - private void createRecordHolderQueue(File[] listFiles) { + private void createRecordHolderQueue() { // creating record holder heap - this.recordHolderHeapLocal = new PriorityQueue(listFiles.length); + this.recordHolderHeapLocal = new PriorityQueue(fileCounter); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 1bbe98c..3756273 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -28,7 +28,9 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -59,7 +61,6 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.NodeHolder; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileWriter; import org.apache.carbondata.format.BlockIndex; @@ -128,7 +129,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter< * executorService */ private List> executorServiceSubmitList; - private CarbonTablePath carbonTablePath; /** * data block size for one carbon data file */ @@ -179,8 +179,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter< CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable( dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + dataWriterVo .getTableName()); - carbonTablePath = CarbonStorePath.getCarbonTablePath(dataWriterVo.getStoreLocation(), - carbonTable.getCarbonTableIdentifier()); //TODO: We should delete the levelmetadata file after reading here. // so only data loading flow will need to read from cardinality file. if (null == this.localCardinality) { @@ -288,14 +286,20 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter< // update the filename with new new sequence // increment the file sequence counter initFileCount(); - this.carbonDataFileName = carbonTablePath + + //each time we initialize writer, we choose a local temp location randomly + String[] tempFileLocations = dataWriterVo.getStoreLocation(); + String chosenTempLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)]; + LOGGER.info("Randomly choose factdata temp location: " + chosenTempLocation); + + this.carbonDataFileName = CarbonTablePath .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(), dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(), "" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; - FileData fileData = new FileData(actualFileNameVal, dataWriterVo.getStoreLocation()); + FileData fileData = new FileData(actualFileNameVal, chosenTempLocation); dataWriterVo.getFileManager().add(fileData); - this.carbonDataFileTempPath = dataWriterVo.getStoreLocation() + File.separator + this.carbonDataFileTempPath = chosenTempLocation + File.separator + carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; this.fileCount++; try { @@ -310,7 +314,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter< private int initFileCount() { int fileInitialCount = 0; - File[] dataFiles = new File(dataWriterVo.getStoreLocation()).listFiles(new FileFilter() { + FileFilter fileFilter = new FileFilter() { @Override public boolean accept(File pathVal) { if (!pathVal.isDirectory() && pathVal.getName().startsWith(dataWriterVo.getTableName()) && pathVal.getName().contains(CarbonCommonConstants.FACT_FILE_EXT)) { @@ -318,9 +322,26 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter< } return false; } - }); + }; + + List dataFileList = new ArrayList(); + for (String tempLoc : dataWriterVo.getStoreLocation()) { + File[] subFiles = new File(tempLoc).listFiles(fileFilter); + if (null != subFiles && subFiles.length > 0) { + dataFileList.addAll(Arrays.asList(subFiles)); + } + } + + File[] dataFiles = new File[dataFileList.size()]; + dataFileList.toArray(dataFiles); if (dataFiles != null && dataFiles.length > 0) { - Arrays.sort(dataFiles); + // since files are in different directory, we should only compare the file name + // and ignore the directory + Arrays.sort(dataFiles, new Comparator() { + @Override public int compare(File o1, File o2) { + return o1.getName().compareTo(o2.getName()); + } + }); String dataFileName = dataFiles[dataFiles.length - 1].getName(); try { fileInitialCount = Integer @@ -439,7 +460,12 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter< .getIndexHeader(localCardinality, thriftColumnSchemaList, dataWriterVo.getBucketNumber()); // get the block index info thrift List blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList); - String fileName = dataWriterVo.getStoreLocation() + File.separator + carbonTablePath + // randomly choose a temp location for index file + String[] tempLocations = dataWriterVo.getStoreLocation(); + String chosenTempLocation = tempLocations[new Random().nextInt(tempLocations.length)]; + LOGGER.info("Randomly choose index file location: " + chosenTempLocation); + + String fileName = chosenTempLocation + File.separator + CarbonTablePath .getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(), dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(), "" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java index defa23a..225e031 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java @@ -28,7 +28,7 @@ import org.apache.carbondata.processing.store.file.IFileManagerComposite; */ public class CarbonDataWriterVo { - private String storeLocation; + private String[] storeLocation; private int measureCount; @@ -67,14 +67,14 @@ public class CarbonDataWriterVo { /** * @return the storeLocation */ - public String getStoreLocation() { + public String[] getStoreLocation() { return storeLocation; } /** * @param storeLocation the storeLocation to set */ - public void setStoreLocation(String storeLocation) { + public void setStoreLocation(String[] storeLocation) { this.storeLocation = storeLocation; }