carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [06/35] carbondata git commit: [CARBONDATA-1992] Remove partitionId in CarbonTablePath
Date Fri, 09 Feb 2018 07:58:08 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 8b87cfc..6cf1dcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -88,11 +89,13 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     child.initialize();
   }
 
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId)
{
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false, false);
+  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
+    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        tableIdentifier.getDatabaseName(),
+        tableIdentifier.getTableName(),
+        String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(),
+        false,
+        false);
     CarbonDataProcessorUtil.createLocations(storeLocation);
     return storeLocation;
   }
@@ -115,11 +118,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       measureCount = configuration.getMeasureCount();
       outputLength = measureCount + (this.noDictWithComplextCount > 0 ? 1 : 0) + 1;
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+          .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
               System.currentTimeMillis());
 
       if (iterators.length == 1) {
-        doExecute(iterators[0], 0, 0);
+        doExecute(iterators[0], 0);
       } else {
         executorService = Executors.newFixedThreadPool(iterators.length,
             new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
@@ -150,11 +153,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     return null;
   }
 
-  private void doExecute(Iterator<CarbonRowBatch> iterator, int partitionId, int iteratorIndex)
{
-    String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
-    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-        .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId,
-            iteratorIndex);
+  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
+    String[] storeLocation = getStoreLocation(tableIdentifier);
+    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
+        configuration, storeLocation, 0, iteratorIndex);
     CarbonFactHandler dataHandler = null;
     boolean rowsNotExist = true;
     while (iterator.hasNext()) {
@@ -189,10 +191,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
     processingComplete(dataHandler);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+        .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+        .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
+            System.currentTimeMillis());
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException
{
@@ -298,7 +301,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     }
 
     @Override public void run() {
-      doExecute(this.iterator, 0, iteratorIndex);
+      doExecute(this.iterator, iteratorIndex);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index f030d52..369c1f2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -59,13 +60,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     child.initialize();
   }
 
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId)
{
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false, false);
-    CarbonDataProcessorUtil.createLocations(storeLocation);
-    return storeLocation;
+  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
+    return CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
+        String.valueOf(configuration.getTaskNo()),
+        configuration.getSegmentId(), false, false);
   }
 
   @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException
{
@@ -75,18 +74,19 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     String tableName = tableIdentifier.getTableName();
     try {
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+          .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
               System.currentTimeMillis());
       int i = 0;
+      String[] storeLocation = getStoreLocation(tableIdentifier);
+      CarbonDataProcessorUtil.createLocations(storeLocation);
       for (Iterator<CarbonRowBatch> iterator : iterators) {
-        String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
         int k = 0;
         while (iterator.hasNext()) {
           CarbonRowBatch next = iterator.next();
           // If no rows from merge sorter, then don't create a file in fact column handler
           if (next.hasNext()) {
             CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-                .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
+                .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++);
             CarbonFactHandler dataHandler = CarbonFactHandlerFactory
                 .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
             dataHandler.initialise();
@@ -119,10 +119,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
     processingComplete(dataHandler);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+        .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+        .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
+            System.currentTimeMillis());
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index d321405..58009af 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -65,21 +66,21 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     child.initialize();
   }
 
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId)
{
+  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
     String[] storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false, false);
+            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
+            configuration.getSegmentId(), false, false);
     CarbonDataProcessorUtil.createLocations(storeLocation);
     return storeLocation;
   }
 
-  public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) {
+  public CarbonFactDataHandlerModel getDataHandlerModel() {
     CarbonTableIdentifier tableIdentifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
-    String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
+    String[] storeLocation = getStoreLocation(tableIdentifier);
     return CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration,
-        storeLocation, partitionId, 0);
+        storeLocation, 0, 0);
   }
 
   @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException
{
@@ -89,11 +90,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     String tableName = tableIdentifier.getTableName();
     try {
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+          .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
               System.currentTimeMillis());
       int i = 0;
       for (Iterator<CarbonRowBatch> iterator : iterators) {
-        String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+        String[] storeLocation = getStoreLocation(tableIdentifier);
 
         CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
             .createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0);
@@ -147,10 +148,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep
{
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
     processingComplete(dataHandler);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+        .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+        .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
+            System.currentTimeMillis());
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException
{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index be27866..0eadc7f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -319,7 +319,7 @@ public final class CarbonDataMergerUtil {
 
         // create entry for merged one.
         LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-        loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID);
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
         long loadEnddate = CarbonUpdateUtil.readCurrentTime();
         loadMetadataDetails.setLoadEndTime(loadEnddate);
@@ -676,7 +676,7 @@ public final class CarbonDataMergerUtil {
       CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
     CarbonTablePath carbonTablePath =
         CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
-    return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    return carbonTablePath.getCarbonDataDirectoryPath(segmentId);
   }
 
 
@@ -1036,7 +1036,7 @@ public final class CarbonDataMergerUtil {
 
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
 
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg);
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 2480a39..ff65db2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -372,7 +372,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor
{
     return SortParameters
         .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
             dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
-            noDictionaryCount, carbonLoadModel.getPartitionId(), segmentId,
+            noDictionaryCount, segmentId,
             carbonLoadModel.getTaskNo(), noDictionaryColMapping, true);
   }
 
@@ -422,7 +422,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor
{
   private void initTempStoreLocation() {
     tempStoreLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName,
-            carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId,
+            carbonLoadModel.getTaskNo(), segmentId,
             true, false);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 98d150e..4d31f87 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -356,7 +357,7 @@ public class SortParameters implements Serializable {
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setDatabaseName(tableIdentifier.getDatabaseName());
     parameters.setTableName(tableIdentifier.getTableName());
-    parameters.setPartitionID(configuration.getPartitionId());
+    parameters.setPartitionID("0");
     parameters.setSegmentId(configuration.getSegmentId());
     parameters.setTaskNo(configuration.getTaskNo());
     parameters.setMeasureColCount(configuration.getMeasureCount());
@@ -392,10 +393,9 @@ public class SortParameters implements Serializable {
 
     LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
 
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), configuration.getTaskNo(),
-            configuration.getPartitionId(), configuration.getSegmentId(), false, false);
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
+        configuration.getTaskNo(), configuration.getSegmentId(), false, false);
     String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
 
@@ -448,13 +448,13 @@ public class SortParameters implements Serializable {
 
   public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
       String tableName, int dimColCount, int complexDimColCount, int measureColCount,
-      int noDictionaryCount, String partitionID, String segmentId, String taskNo,
+      int noDictionaryCount, String segmentId, String taskNo,
       boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setDatabaseName(databaseName);
     parameters.setTableName(tableName);
-    parameters.setPartitionID(partitionID);
+    parameters.setPartitionID(CarbonTablePath.DEPRECATED_PATITION_ID);
     parameters.setSegmentId(segmentId);
     parameters.setTaskNo(taskNo);
     parameters.setMeasureColCount(measureColCount);
@@ -486,7 +486,7 @@ public class SortParameters implements Serializable {
     LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
 
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId,
+        .getLocalDataFolderLocation(databaseName, tableName, taskNo, segmentId,
             isCompactionFlow, false);
     String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d15152c..9f3c86f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -309,7 +309,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
         .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
-            tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+            tableName, loadModel.getSegmentId());
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
     boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -336,9 +336,8 @@ public class CarbonFactDataHandlerModel {
   private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration)
{
     AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String carbonDataDirectoryPath = carbonTablePath
-        .getCarbonDataDirectoryPath(configuration.getPartitionId(),
-            configuration.getSegmentId() + "");
+    String carbonDataDirectoryPath =
+        carbonTablePath.getCarbonDataDirectoryPath(configuration.getSegmentId());
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 2a4cc00..cfe6e31 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -123,13 +123,11 @@ public final class CarbonDataProcessorUtil {
    * @param databaseName
    * @param tableName
    * @param taskId
-   * @param partitionId
    * @param segmentId
    * @return
    */
   public static String[] getLocalDataFolderLocation(String databaseName, String tableName,
-      String taskId, String partitionId, String segmentId, boolean isCompactionFlow,
-      boolean isAltPartitionFlow) {
+      String taskId, String segmentId, boolean isCompactionFlow, boolean isAltPartitionFlow)
{
     String tempLocationKey =
         getTempStoreLocationKey(databaseName, tableName, segmentId, taskId, isCompactionFlow,
             isAltPartitionFlow);
@@ -150,8 +148,7 @@ public final class CarbonDataProcessorUtil {
       String tmpStore = baseTmpStorePathArray[i];
       CarbonTablePath carbonTablePath =
           CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier());
-      String carbonDataDirectoryPath =
-          carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId + "");
+      String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
 
       localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId;
     }
@@ -378,13 +375,12 @@ public final class CarbonDataProcessorUtil {
    * @return data directory path
    */
   public static String checkAndCreateCarbonStoreLocation(String factStoreLocation,
-      String databaseName, String tableName, String partitionId, String segmentId) {
+      String databaseName, String tableName, String segmentId) {
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
     CarbonTablePath carbonTablePath =
         CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
+    String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 00f13a5..083128d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -77,10 +77,8 @@ public final class CarbonLoaderUtil {
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad
+ "");
-      deleteStorePath(segmentPath);
-    }
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + "");
+    deleteStorePath(segmentPath);
   }
 
   /**
@@ -100,7 +98,7 @@ public final class CarbonLoaderUtil {
     int fileCount = 0;
     int partitionCount = carbonTable.getPartitionCount();
     for (int i = 0; i < partitionCount; i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "",
+      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(
           currentLoad + "");
       CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
           FileFactory.getFileType(segmentPath));
@@ -295,7 +293,7 @@ public final class CarbonLoaderUtil {
 
   private static void addToStaleFolders(CarbonTablePath carbonTablePath,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException
{
-    String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
+    String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName());
     // add to the deletion list only if file exist else HDFS file system will throw
     // exception while deleting the file if file path does not exist
     if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
@@ -876,7 +874,7 @@ public final class CarbonLoaderUtil {
     CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
     CarbonTablePath carbonTablePath =
         CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier);
-    String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
     CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index 02ab1d8..f9f3e20 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -48,15 +48,14 @@ public final class DeleteLoadFolders {
    * returns segment path
    *
    * @param absoluteTableIdentifier
-   * @param partitionId
    * @param oneLoad
    * @return
    */
   private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier,
-      int partitionId, LoadMetadataDetails oneLoad) {
+      LoadMetadataDetails oneLoad) {
     CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String segmentId = oneLoad.getLoadName();
-    return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
+    return carbon.getCarbonDataDirectoryPath(segmentId);
   }
 
   public static void physicalFactAndMeasureMetadataDeletion(
@@ -64,7 +63,7 @@ public final class DeleteLoadFolders {
     LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
     for (LoadMetadataDetails oneLoad : currentDetails) {
       if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
-        String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
+        String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
         boolean status = false;
         try {
           if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index e662757..7f0aef6 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -163,7 +163,6 @@ public class StoreCreator {
       loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
       loadModel.setTaskNo("0");
       loadModel.setSegmentId("0");
-      loadModel.setPartitionId("0");
       loadModel.setFactTimeStamp(System.currentTimeMillis());
       loadModel.setMaxColumns("10");
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 41dfa50..9088731 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -107,7 +107,6 @@ class StreamHandoffRDD[K, V](
       split: Partition,
       context: TaskContext
   ): Iterator[(K, V)] = {
-    carbonLoadModel.setPartitionId("0")
     carbonLoadModel.setTaskNo("" + split.index)
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     CarbonMetadata.getInstance().addCarbonTable(carbonTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 1c7be6a..f2274be 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -135,7 +135,7 @@ object StreamSinkFactory {
       FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType)
     }
     val segmentId = StreamSegment.open(carbonTable)
-    val segmentDir = carbonTablePath.getSegmentDir("0", segmentId)
+    val segmentDir = carbonTablePath.getSegmentDir(segmentId)
     if (FileFactory.isFileExist(segmentDir, fileType)) {
       // recover fault
       StreamSegment.recoverSegmentIfRequired(segmentDir)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 849bf99..45bc19a 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -149,12 +149,12 @@ class CarbonAppendableStreamSink(
    * if the directory size of current segment beyond the threshold, hand off new segment
    */
   private def checkOrHandOffSegment(): Unit = {
-    val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
+    val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
     val fileType = FileFactory.getFileType(segmentDir)
     if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
       val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
       currentSegmentId = newSegmentId
-      val newSegmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
+      val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
       FileFactory.mkdirs(newSegmentDir, fileType)
 
       // TODO trigger hand off operation
@@ -251,14 +251,14 @@ object CarbonAppendableStreamSink {
 
         // update data file info in index file
         val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-        StreamSegment.updateIndexFile(tablePath.getSegmentDir("0", segmentId))
+        StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId))
 
       } catch {
         // catch fault of executor side
         case t: Throwable =>
           val tablePath =
             CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-          val segmentDir = tablePath.getSegmentDir("0", segmentId)
+          val segmentDir = tablePath.getSegmentDir(segmentId)
           StreamSegment.recoverSegmentIfRequired(segmentDir)
           LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
           committer.abortJob(job)


Mime
View raw message