carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA - 1159] Batch sort loading is not proper without synchronization
Date Mon, 12 Jun 2017 19:49:52 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master d1080df44 -> 0ad92b6a0


[CARBONDATA - 1159] Batch sort loading is not proper without synchronization


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/408de862
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/408de862
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/408de862

Branch: refs/heads/master
Commit: 408de862a0a47d56b9038d7354d5173567b43eda
Parents: d1080df
Author: dhatchayani <dhatcha.official@gmail.com>
Authored: Mon Jun 12 21:56:47 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Jun 13 01:18:54 2017 +0530

----------------------------------------------------------------------
 .../UnsafeBatchParallelReadMergeSorterImpl.java |  7 +-
 .../newflow/sort/unsafe/UnsafeSortDataRows.java | 72 ++++++++++++--------
 .../util/CarbonDataProcessorUtil.java           |  7 +-
 3 files changed, 56 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/408de862/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 7121278..8c345cb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -147,9 +147,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
             }
           }
           if (i > 0) {
-            sortDataRows.getSortDataRow().addRowBatch(buffer, i);
-            rowCounter.getAndAdd(i);
             synchronized (sortDataRows) {
+              sortDataRows.getSortDataRow().addRowBatchWithOutSync(buffer, i);
+              rowCounter.getAndAdd(i);
               if (!sortDataRows.getSortDataRow().canAdd()) {
                 sortDataRows.finish();
                 sortDataRows.createSortDataRows();
@@ -197,6 +197,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
     private void createSortDataRows() {
       int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+      if (inMemoryChunkSizeInMB > sortParameters.getBatchSortSizeinMb()) {
+        inMemoryChunkSizeInMB = sortParameters.getBatchSortSizeinMb();
+      }
       this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
           sortParameters.getTempFileLocation());
       unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/408de862/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 415d708..074bb3b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -166,35 +166,53 @@ public class UnsafeSortDataRows {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
     synchronized (addRowsLock) {
-      for (int i = 0; i < size; i++) {
-        if (rowPage.canAdd()) {
-          bytesAdded += rowPage.addRow(rowBatch[i]);
-        } else {
-          try {
-            if (enableInMemoryIntermediateMerge) {
-              unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
-            }
-            unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
-            semaphore.acquire();
-            dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
-            MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
-            boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
-            rowPage = new UnsafeCarbonRowPage(
-                parameters.getNoDictionaryDimnesionColumn(),
-                parameters.getNoDictionarySortColumn(),
-                parameters.getDimColCount() + parameters.getComplexDimColCount(),
-                parameters.getMeasureColCount(),
-                parameters.getMeasureDataType(),
-                memoryBlock,
-                saveToDisk);
-            bytesAdded += rowPage.addRow(rowBatch[i]);
-          } catch (Exception e) {
-            LOGGER.error(
-                "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
-            throw new CarbonSortKeyAndGroupByException(e);
-          }
+      addBatch(rowBatch, size);
+    }
+  }
+
+  /**
+   * This method will be used to add new row
+   *
+   * @param rowBatch new rowBatch
+   * @param size
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  public void addRowBatchWithOutSync(Object[][] rowBatch, int size)
+      throws CarbonSortKeyAndGroupByException {
+    // if record holder list size is equal to sort buffer size then it will
+    // sort the list and then write current list data to file
+    addBatch(rowBatch, size);
+  }
 
+  private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException
{
+    for (int i = 0; i < size; i++) {
+      if (rowPage.canAdd()) {
+        bytesAdded += rowPage.addRow(rowBatch[i]);
+      } else {
+        try {
+          if (enableInMemoryIntermediateMerge) {
+            unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+          }
+          unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+          semaphore.acquire();
+          dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
+          MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+          boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+          rowPage = new UnsafeCarbonRowPage(
+                  parameters.getNoDictionaryDimnesionColumn(),
+                  parameters.getNoDictionarySortColumn(),
+                  parameters.getDimColCount() + parameters.getComplexDimColCount(),
+                  parameters.getMeasureColCount(),
+                  parameters.getMeasureDataType(),
+                  memoryBlock,
+                  saveToDisk);
+          bytesAdded += rowPage.addRow(rowBatch[i]);
+        } catch (Exception e) {
+          LOGGER.error(
+                  "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+          throw new CarbonSortKeyAndGroupByException(e);
         }
+
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/408de862/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 1659be2..7bb915a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -444,9 +444,11 @@ public final class CarbonDataProcessorUtil {
             configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)
                 .toString());
       }
+      LOGGER.warn("sort scope is set to " + sortScope);
     } catch (Exception e) {
       sortScope = SortScopeOptions.getSortScope(CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT);
-      LOGGER.warn("sort scope is set to " + sortScope);
+      LOGGER.warn("Exception occured while resolving sort scope. " +
+          "sort scope is set to " + sortScope);
     }
     return sortScope;
   }
@@ -469,8 +471,11 @@ public final class CarbonDataProcessorUtil {
             configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
                 .toString());
       }
+      LOGGER.warn("batch sort size is set to " + batchSortSizeInMb);
     } catch (Exception e) {
       batchSortSizeInMb = 0;
+      LOGGER.warn("Exception occured while resolving batch sort size. " +
+          "batch sort size is set to " + batchSortSizeInMb);
     }
     return batchSortSizeInMb;
   }


Mime
View raw message