carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] carbondata git commit: Acquire semaphore before submit a producer in finish.
Date Fri, 02 Jun 2017 08:01:42 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 50627c047 -> 4abe42739


Acquire semaphore before submit a producer in finish.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f715126c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f715126c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f715126c

Branch: refs/heads/master
Commit: f715126c30dec0aa224bfcae54950de2fc86dc05
Parents: 50627c0
Author: Yadong Qi <qiyadong2010@gmail.com>
Authored: Thu Jun 1 20:28:19 2017 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri Jun 2 13:30:30 2017 +0530

----------------------------------------------------------------------
 .../store/CarbonFactDataHandlerColumnar.java    | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f715126c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 5094432..edc7ece 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -488,13 +488,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler
{
   public void finish() throws CarbonDataWriterException {
     // still some data is present in stores if entryCount is more
     // than 0
-    producerExecutorServiceTaskList.add(producerExecutorService
-        .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true)));
-    blockletProcessingCount.incrementAndGet();
-    processedDataCount += entryCount;
-    closeWriterExecutionService(producerExecutorService);
-    processWriteTaskSubmitList(producerExecutorServiceTaskList);
-    processingComplete = true;
+    try {
+      semaphore.acquire();
+      producerExecutorServiceTaskList.add(producerExecutorService
+          .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter,
true)));
+      blockletProcessingCount.incrementAndGet();
+      processedDataCount += entryCount;
+      closeWriterExecutionService(producerExecutorService);
+      processWriteTaskSubmitList(producerExecutorServiceTaskList);
+      processingComplete = true;
+    } catch (InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+      throw new CarbonDataWriterException(e.getMessage(), e);
+    }
   }
 
   /**


Mime
View raw message