carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kumarvisha...@apache.org
Subject carbondata git commit: [CARBONDATA-2492]Fixed thread leak issue in case of data load failure
Date Fri, 18 May 2018 09:01:46 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 78efc7f71 -> 3f33276d5


[CARBONDATA-2492]Fixed thread leak issue in case of data load failure

Problem: Executor service is not getting shutdown after data load failure.

Root cause: When sort step is failing because of any exception it is not updating its parent
thread about the failure
because of this executor service is not getting shutdown

solution: update the parent thread about the failure

This closes #2317


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3f33276d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3f33276d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3f33276d

Branch: refs/heads/master
Commit: 3f33276d5926ce73243f5678ecb1e59977bd8f88
Parents: 78efc7f
Author: BJangir <babulaljangir111@gmail.com>
Authored: Fri May 18 13:01:53 2018 +0530
Committer: kumarvishal09 <kumarvishal1802@gmail.com>
Committed: Fri May 18 14:30:43 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/hadoop/api/CarbonTableOutputFormat.java    | 2 +-
 .../loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java     | 4 +++-
 .../processing/loading/sort/unsafe/UnsafeSortDataRows.java       | 1 +
 3 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3f33276d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index b2ff3ab..e11efc4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -253,11 +253,11 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
Obje
               .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper
});
         } catch (Exception e) {
           executorService.shutdownNow();
+          iteratorWrapper.closeWriter(true);
           dataLoadExecutor.close();
           // clean up the folders and files created locally for data load operation
           TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
 
-          iteratorWrapper.closeWriter(true);
           throw new RuntimeException(e);
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3f33276d/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 9ff1b22..e9cc986 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -96,7 +96,9 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter
{
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRow, sortParameters);
+      if (!sortParameters.getObserver().isFailed()) {
+        processRowToNextStep(sortDataRow, sortParameters);
+      }
     } catch (Exception e) {
       checkError();
       throw new CarbonDataLoadingException("Problem while shutdown the server ", e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3f33276d/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 48fd0eb..1c6ce8d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -319,6 +319,7 @@ public class UnsafeSortDataRows {
      * @throws CarbonSortKeyAndGroupByException
      */
     public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException
{
+      semaphore.release();
       dataSorterAndWriterExecutorService.shutdownNow();
       unsafeInMemoryIntermediateFileMerger.close();
       parameters.getObserver().setFailed(true);


Mime
View raw message