carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-1281] Support multiple temp dirs for writing files while loading data
Date Thu, 27 Jul 2017 13:17:09 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 79feac96a -> ded8b4162


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 6715562..815c752 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -58,6 +58,7 @@ import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 
 public final class CarbonDataProcessorUtil {
   private static final LogService LOGGER =
@@ -145,21 +146,31 @@ public final class CarbonDataProcessorUtil {
   /**
    * This method will be used to delete sort temp location is it is exites
    */
-  public static void deleteSortLocationIfExists(String tempFileLocation) {
-    // create new temp file location where this class
-    //will write all the temp files
-    File file = new File(tempFileLocation);
-
-    if (file.exists()) {
-      try {
-        CarbonUtil.deleteFoldersAndFiles(file);
-      } catch (IOException | InterruptedException e) {
-        LOGGER.error(e);
+  public static void deleteSortLocationIfExists(String[] locations) {
+    for (String loc : locations) {
+      File file = new File(loc);
+      if (file.exists()) {
+        try {
+          CarbonUtil.deleteFoldersAndFiles(file);
+        } catch (IOException | InterruptedException e) {
+          LOGGER.error(e, "Failed to delete " + loc);
+        }
       }
     }
   }
 
   /**
+   * This method will be used to create dirs
+   * @param locations locations to create
+   */
+  public static void createLocations(String[] locations) {
+    for (String loc : locations) {
+      if (!new File(loc).mkdirs()) {
+        LOGGER.warn("Error occurs while creating dirs: " + loc);
+      }
+    }
+  }
+  /**
    * This method will form the local data folder store location
    *
    * @param databaseName
@@ -169,21 +180,34 @@ public final class CarbonDataProcessorUtil {
    * @param segmentId
    * @return
    */
-  public static String getLocalDataFolderLocation(String databaseName, String tableName,
+  public static String[] getLocalDataFolderLocation(String databaseName, String tableName,
       String taskId, String partitionId, String segmentId, boolean isCompactionFlow) {
     String tempLocationKey =
         getTempStoreLocationKey(databaseName, tableName, segmentId, taskId, isCompactionFlow);
-    String baseStorePath = CarbonProperties.getInstance().getProperty(tempLocationKey);
-    if (baseStorePath == null) {
-      LOGGER.warn("Location not set for the key " + tempLocationKey);
+    String baseTempStorePath = CarbonProperties.getInstance()
+        .getProperty(tempLocationKey);
+    if (baseTempStorePath == null) {
+      LOGGER.warn("Location not set for the key " + tempLocationKey
+          + ". This will occur during a global-sort loading,"
+          + " in this case local dirs will be chosen by spark");
+      baseTempStorePath = "./store.location";
     }
+
+    String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator);
+    String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length];
+
     CarbonTable carbonTable = CarbonMetadata.getInstance()
         .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(baseStorePath, carbonTable.getCarbonTableIdentifier());
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId + "");
-    return carbonDataDirectoryPath + File.separator + taskId;
+    for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
+      String tmpStore = baseTmpStorePathArray[i];
+      CarbonTablePath carbonTablePath =
+          CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier());
+      String carbonDataDirectoryPath =
+          carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId + "");
+
+      localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId;
+    }
+    return localDataFolderLocArray;
   }
 
   /**
@@ -542,4 +566,22 @@ public final class CarbonDataProcessorUtil {
     return false;
   }
 
+  /**
+   * This method will return an array whose element with be appended with the `append` strings
+   * @param inputArr  inputArr
+   * @param append strings to append
+   * @return result
+   */
+  public static String[] arrayAppend(String[] inputArr, String... append) {
+    String[] outArr = new String[inputArr.length];
+    StringBuffer sb = new StringBuffer();
+    for (String str : append) {
+      sb.append(str);
+    }
+    String appendStr = sb.toString();
+    for (int i = 0; i < inputArr.length; i++) {
+      outArr[i] = inputArr[i] + appendStr;
+    }
+    return outArr;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ded8b416/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index a7c2057..4dec81d 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -403,8 +403,9 @@ public class StoreCreator {
         format.createRecordReader(blockDetails, hadoopAttemptContext);
 
     CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails,
hadoopAttemptContext);
+    String[] storeLocationArray = new String[] {storeLocation};
     new DataLoadExecutor().execute(loadModel,
-        storeLocation,
+        storeLocationArray,
         new CarbonIterator[]{readerIterator});
 
     info.setDatabaseName(databaseName);


Mime
View raw message