carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [01/12] carbondata git commit: [CARBONDATA-1223] Fixing empty file creation in batch sort loading
Date Tue, 27 Jun 2017 14:08:07 GMT
Repository: carbondata
Updated Branches:
  refs/heads/encoding_override 30ef14e0d -> 3ecb3ec58


[CARBONDATA-1223] Fixing empty file creation in batch sort loading


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0205fa69
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0205fa69
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0205fa69

Branch: refs/heads/encoding_override
Commit: 0205fa6991e2b1d3f2a807121d15a6eeb9f07714
Parents: 30ef14e
Author: dhatchayani <dhatcha.official@gmail.com>
Authored: Fri Jun 23 19:24:47 2017 +0530
Committer: dhatchayani <dhatcha.official@gmail.com>
Committed: Fri Jun 23 19:24:51 2017 +0530

----------------------------------------------------------------------
 .../UnsafeBatchParallelReadMergeSorterImpl.java | 16 +++++++++---
 .../UnsafeSingleThreadFinalSortFilesMerger.java | 26 --------------------
 .../steps/DataWriterBatchProcessorStepImpl.java | 18 ++++++++------
 3 files changed, 23 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 20a560d..a8d1eef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -155,7 +155,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
               sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
               rowCounter.getAndAdd(i);
               if (!sortDataRows.getSortDataRow().canAdd()) {
-                sortDataRows.finish();
+                sortDataRows.finish(false);
                 sortDataRows.createSortDataRows();
               }
             }
@@ -246,7 +246,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
       return sortDataRow;
     }
 
-    public void finish() {
+    public void finish(boolean isFinalAttempt) {
       try {
         // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred
         // then set stop process to true in the finalmerger instance
@@ -254,6 +254,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
             && threadStatusObserver.getThrowable() != null && threadStatusObserver
             .getThrowable() instanceof CarbonDataLoadingException) {
           finalMerger.setStopProcess(true);
+          if (isFinalAttempt) {
+            iteratorCount.decrementAndGet();
+          }
           mergerQueue.put(finalMerger);
           return;
         }
@@ -263,6 +266,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
             unsafeIntermediateFileMerger.getMergedPages());
         unsafeIntermediateFileMerger.close();
+        if (isFinalAttempt) {
+          iteratorCount.decrementAndGet();
+        }
         mergerQueue.put(finalMerger);
         sortDataRow = null;
         unsafeIntermediateFileMerger = null;
@@ -284,8 +290,10 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
     public void finishThread() {
       synchronized (lock) {
-        if (iteratorCount.decrementAndGet() <= 0) {
-          finish();
+        if (iteratorCount.get() <= 1) {
+          finish(true);
+        } else {
+          iteratorCount.decrementAndGet();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index acb976f..eb7af47 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -55,25 +55,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   private SortParameters parameters;
 
   /**
-   * number of measures
-   */
-  private int measureCount;
-
-  /**
-   * number of dimensionCount
-   */
-  private int dimensionCount;
-
-  /**
-   * number of complexDimensionCount
-   */
-  private int noDictionaryCount;
-
-  private int complexDimensionCount;
-
-  private boolean[] isNoDictionaryDimensionColumn;
-
-  /**
    * tempFileLocation
    */
   private String tempFileLocation;
@@ -85,13 +66,6 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
       String tempFileLocation) {
     this.parameters = parameters;
-    // set measure and dimension count
-    this.measureCount = parameters.getMeasureColCount();
-    this.dimensionCount = parameters.getDimColCount();
-    this.complexDimensionCount = parameters.getComplexDimColCount();
-
-    this.noDictionaryCount = parameters.getNoDictionaryCount();
-    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
     this.tempFileLocation = tempFileLocation;
     this.tableName = parameters.getTableName();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0205fa69/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
index d58835c..46c1020 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
@@ -82,13 +82,16 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
         int k = 0;
         while (iterator.hasNext()) {
           CarbonRowBatch next = iterator.next();
-          CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-              .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
-          CarbonFactHandler dataHandler = CarbonFactHandlerFactory
-              .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
-          dataHandler.initialise();
-          processBatch(next, dataHandler);
-          finish(tableName, dataHandler);
+          // If no rows from merge sorter, then don't create a file in fact column handler
+          if (next.hasNext()) {
+            CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+                .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
+            CarbonFactHandler dataHandler = CarbonFactHandlerFactory
+                .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+            dataHandler.initialise();
+            processBatch(next, dataHandler);
+            finish(tableName, dataHandler);
+          }
         }
         i++;
       }
@@ -137,6 +140,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
       dataHandler.addDataToStore(row);
       batchSize++;
     }
+    batch.close();
     rowCounter.getAndAdd(batchSize);
   }
 


Mime
View raw message