carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-PROCESSING]Fix findbugs issues in carbondata-processing module
Date Sat, 19 Aug 2017 01:58:01 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 500654e60 -> 06b0d0816


[CARBONDATA-PROCESSING]Fix findbugs issues in carbondata-processing module

Fix findbugs issues in carbondata-processing module

This closes #1268


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/06b0d081
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/06b0d081
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/06b0d081

Branch: refs/heads/master
Commit: 06b0d081691c6d17d099557dbbe1ee766e9de0d4
Parents: 500654e
Author: Raghunandan S <carbondatacontributions@gmail.com>
Authored: Fri Aug 18 18:14:38 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Sat Aug 19 09:57:23 2017 +0800

----------------------------------------------------------------------
 .../processing/datatypes/PrimitiveDataType.java |   6 --
 .../processing/merger/CarbonDataMergerUtil.java |  30 ++----
 .../merger/CarbonDataMergerUtilResult.java      |   7 ++
 .../impl/MeasureFieldConverterImpl.java         |   2 +
 .../processing/newflow/row/CarbonRowBatch.java  |  11 +-
 .../newflow/sort/SortStepRowUtil.java           |   7 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   9 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  10 +-
 .../UnsafeBatchParallelReadMergeSorterImpl.java |  15 +--
 .../impl/UnsafeParallelReadMergeSorterImpl.java |  10 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  24 +++--
 .../sort/unsafe/UnsafeCarbonRowPage.java        |   9 ++
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  17 +--
 .../holder/UnsafeFinalMergePageHolder.java      |  17 +++
 .../unsafe/holder/UnsafeInmemoryHolder.java     |  18 ++++
 .../holder/UnsafeInmemoryMergeHolder.java       |  17 +++
 .../holder/UnsafeSortTempFileChunkHolder.java   |  11 +-
 .../UnsafeInMemoryIntermediateDataMerger.java   |   7 +-
 .../merger/UnsafeIntermediateFileMerger.java    |   8 +-
 .../unsafe/merger/UnsafeIntermediateMerger.java |   4 +-
 .../newflow/steps/DummyClassForTest.java        | 103 -------------------
 .../sortdata/IntermediateFileMerger.java        |  10 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  20 ++--
 .../sortdata/SortIntermediateFileMerger.java    |   2 +-
 .../sortdata/SortTempFileChunkHolder.java       |  10 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   6 +-
 .../store/SingleThreadFinalSortFilesMerger.java |  32 +++---
 27 files changed, 194 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index e7e48e9..729f9e3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -71,11 +71,6 @@ public class PrimitiveDataType implements GenericDataType<Object> {
   private String columnId;
 
   /**
-   * dimension ordinal of primitive type column
-   */
-  private int dimensionOrdinal;
-
-  /**
    * key size
    */
   private int keySize;
@@ -105,7 +100,6 @@ public class PrimitiveDataType implements GenericDataType<Object> {
     this.name = name;
     this.parentname = parentname;
     this.columnId = columnId;
-    this.dimensionOrdinal = dimensionOrdinal;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index ce9c433..86e9eff 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -169,8 +169,7 @@ public final class CarbonDataMergerUtil {
 
     String timestamp = "" + carbonLoadModel.getFactTimeStamp();
 
-    List<String> updatedDeltaFilesList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<String> updatedDeltaFilesList = null;
 
     // This routine updateLoadMetadataIUDCompactionMergeStatus is suppose to update
     // two files as it is only called during IUD_UPDDEL_DELTA_COMPACTION. Along with
@@ -259,23 +258,16 @@ public final class CarbonDataMergerUtil {
             }
           }
 
-          try {
-            segmentUpdateStatusManager
-                .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
-            segmentStatusManager
-                .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
-            status = true;
-          } catch (IOException e) {
-            LOGGER.error(
-                "Error while writing metadata. The metadata file path is " + carbonTablePath
-                    .getMetadataDirectoryPath());
-            status = false;
-          }
+          segmentUpdateStatusManager
+              .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
+          segmentStatusManager
+              .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
+          status = true;
         } else {
           LOGGER.error("Not able to acquire the lock.");
           status = false;
         }
-      } catch (Exception e) {
+      } catch (IOException e) {
         LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath
             .getMetadataDirectoryPath());
         status = false;
@@ -456,13 +448,7 @@ public final class CarbonDataMergerUtil {
       @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
         double seg1Id = Double.parseDouble(seg1.getLoadName());
         double seg2Id = Double.parseDouble(seg2.getLoadName());
-        if (seg1Id - seg2Id < 0) {
-          return -1;
-        }
-        if (seg1Id - seg2Id > 0) {
-          return 1;
-        }
-        return 0;
+        return Double.compare(seg1Id, seg2Id);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java
index aa3d801..cf1e22d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java
@@ -30,4 +30,11 @@ public final class CarbonDataMergerUtilResult extends SegmentUpdateDetails {
     compactionStatus = status;
   }
 
+  @Override public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
index 58fa88e..8e20b8f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
@@ -65,6 +65,8 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     if (value == null || isNull) {
       String message = logHolder.getColumnMessageMap().get(measure.getColName());
       if (null == message) {
+        message = CarbonDataProcessorUtil
+            .prepareFailureReason(measure.getColName(), measure.getDataType());
         logHolder.getColumnMessageMap().put(measure.getColName(), message);
       }
       row.update(null, index);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
index e8eb071..1de55e0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
@@ -17,9 +17,12 @@
 
 package org.apache.carbondata.processing.newflow.row;
 
+import java.util.NoSuchElementException;
+
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 
+
 /**
  * Batch of rows.
  */
@@ -47,8 +50,12 @@ public class CarbonRowBatch extends CarbonIterator<CarbonRow> {
     return index < size;
   }
 
-  @Override public CarbonRow next() {
-    return rowBatch[index++];
+  @Override
+  public CarbonRow next() throws NoSuchElementException {
+    if (hasNext()) {
+      return rowBatch[index++];
+    }
+    throw new NoSuchElementException("no more elements to iterate");
   }
 
   @Override public void remove() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
index 7857f4e..5238c3c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
@@ -58,12 +58,7 @@ public class SortStepRowUtil {
 
       // read measure values
       for (int i = 0; i < measureCount; i++) {
-        if (needConvertDecimalToByte) {
-          measures[index++] = data[allCount];
-        } else {
-          measures[index++] = data[allCount];
-        }
-
+        measures[index++] = data[allCount];
         allCount++;
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index c3cf3c0..5a8a2c8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.processing.newflow.sort.impl;
 
 import java.io.File;
 import java.util.Iterator;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -100,7 +99,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
 
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService.submit(
+        executorService.execute(
             new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
                 threadStatusObserver));
       }
@@ -183,7 +182,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
   /**
    * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
    */
-  private static class SortIteratorThread implements Callable<Void> {
+  private static class SortIteratorThread implements Runnable {
 
     private Iterator<CarbonRowBatch> iterator;
 
@@ -206,7 +205,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
     }
 
     @Override
-    public Void call() throws CarbonDataLoadingException {
+    public void run() {
       try {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
@@ -225,9 +224,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
       } catch (Exception e) {
         LOGGER.error(e);
         observer.notifyFailed(e);
-        throw new CarbonDataLoadingException(e);
       }
-      return null;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 851c384..7e013e0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.processing.newflow.sort.impl;
 
 import java.io.File;
 import java.util.Iterator;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -104,7 +103,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService.submit(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
+        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
             this.threadStatusObserver));
       }
       executorService.shutdown();
@@ -197,7 +196,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
   /**
    * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
    */
-  private static class SortIteratorThread implements Callable<Void> {
+  private static class SortIteratorThread implements Runnable {
 
     private Iterator<CarbonRowBatch> iterator;
 
@@ -215,7 +214,8 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
       this.threadStatusObserver = observer;
     }
 
-    @Override public Void call() throws CarbonDataLoadingException {
+    @Override
+    public void run() {
       try {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
@@ -234,9 +234,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
       } catch (Exception e) {
         LOGGER.error(e);
         this.threadStatusObserver.notifyFailed(e);
-        throw new CarbonDataLoadingException(e);
       }
-      return null;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index ebb85b4..056c96b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -20,7 +20,6 @@ import java.io.File;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -83,7 +82,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService.submit(
+        executorService.execute(
             new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter,
                 this.threadStatusObserver));
       }
@@ -118,7 +117,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
   /**
    * This thread iterates the iterator and adds the rows
    */
-  private static class SortIteratorThread implements Callable<Void> {
+  private static class SortIteratorThread implements Runnable {
 
     private Iterator<CarbonRowBatch> iterator;
 
@@ -139,7 +138,8 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
       this.threadStatusObserver = threadStatusObserver;
     }
 
-    @Override public Void call() throws CarbonDataLoadingException {
+    @Override
+    public void run() {
       try {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
@@ -164,11 +164,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
       } catch (Exception e) {
         LOGGER.error(e);
         this.threadStatusObserver.notifyFailed(e);
-        throw new CarbonDataLoadingException(e);
       } finally {
         sortDataRows.finishThread();
       }
-      return null;
     }
 
   }
@@ -283,7 +281,10 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         // thread from waiting.
         if (finalMerger != null) {
           finalMerger.setStopProcess(true);
-          mergerQueue.offer(finalMerger);
+          boolean offered = mergerQueue.offer(finalMerger);
+          if (!offered) {
+            throw new CarbonDataLoadingException(e);
+          }
         }
         throw new CarbonDataLoadingException(e);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index ad4ebfc..a0d43ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.processing.newflow.sort.impl;
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -88,7 +87,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
 
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService.submit(
+        executorService.execute(
             new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
                 this.threadStatusObserver));
       }
@@ -168,7 +167,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
   /**
    * This thread iterates the iterator and adds the rows
    */
-  private static class SortIteratorThread implements Callable<Void> {
+  private static class SortIteratorThread implements Runnable {
 
     private Iterator<CarbonRowBatch> iterator;
 
@@ -190,7 +189,8 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
       this.threadStatusObserver = threadStatusObserver;
     }
 
-    @Override public Void call() throws CarbonDataLoadingException {
+    @Override
+    public void run() {
       try {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
@@ -209,9 +209,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
       } catch (Exception e) {
         LOGGER.error(e);
         this.threadStatusObserver.notifyFailed(e);
-        throw new CarbonDataLoadingException(e);
       }
-      return null;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
index f000619..54e0180 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.newflow.sort.impl;
 import java.io.File;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +36,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
 import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
@@ -52,7 +51,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
  * This step is specifically for bucketing, it sorts each bucket data separately and write to
  * temp files.
  */
-public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
+public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(
@@ -95,17 +94,21 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
       throw new CarbonDataLoadingException(e);
     }
     ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(executorService);
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService.submit(new SortIteratorThread(iterators[i], sortDataRows));
+        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this
+            .threadStatusObserver));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
       processRowToNextStep(sortDataRows, sortParameters);
     } catch (Exception e) {
+      checkError();
       throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
     }
+    checkError();
     try {
       for (int i = 0; i < intermediateFileMergers.length; i++) {
         intermediateFileMergers[i].finish();
@@ -180,19 +183,23 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
   /**
    * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
    */
-  private static class SortIteratorThread implements Callable<Void> {
+  private static class SortIteratorThread implements Runnable {
 
     private Iterator<CarbonRowBatch> iterator;
 
     private UnsafeSortDataRows[] sortDataRows;
 
+    private ThreadStatusObserver threadStatusObserver;
+
     public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
-        UnsafeSortDataRows[] sortDataRows) {
+        UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) {
       this.iterator = iterator;
       this.sortDataRows = sortDataRows;
+      this.threadStatusObserver = threadStatusObserver;
     }
 
-    @Override public Void call() throws CarbonDataLoadingException {
+    @Override
+    public void run() {
       try {
         while (iterator.hasNext()) {
           CarbonRowBatch batch = iterator.next();
@@ -209,9 +216,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
         }
       } catch (Exception e) {
         LOGGER.error(e);
-        throw new CarbonDataLoadingException(e);
+        this.threadStatusObserver.notifyFailed(e);
       }
-      return null;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index 20b60c9..8b23437 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -157,6 +157,9 @@ public class UnsafeCarbonRowPage {
                     address + size, bigDecimalInBytes.length);
             size += bigDecimalInBytes.length;
             break;
+          default:
+            throw  new IllegalArgumentException("unsupported data type:" +
+                measureDataType[mesCount]);
         }
         set(nullSetWords, mesCount);
       } else {
@@ -240,6 +243,9 @@ public class UnsafeCarbonRowPage {
             size += bigDecimalInBytes.length;
             rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
             break;
+          default:
+            throw new IllegalArgumentException("unsupported data type:" +
+                measureDataType[mesCount]);
         }
       } else {
         rowToFill[dimensionSize + mesCount] = null;
@@ -326,6 +332,9 @@ public class UnsafeCarbonRowPage {
             stream.writeShort(aShort);
             stream.write(bigDecimalInBytes);
             break;
+          default:
+            throw new IllegalArgumentException("unsupported data type:" +
+                measureDataType[mesCount]);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index d0bb4f6..dda0d89 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Random;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
@@ -102,7 +101,7 @@ public class UnsafeSortDataRows {
     this.threadStatusObserver = new ThreadStatusObserver();
     this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
     this.inMemoryChunkSize = inMemoryChunkSize;
-    this.inMemoryChunkSize = inMemoryChunkSize * 1024 * 1024;
+    this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L;
     enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
             CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT));
@@ -186,7 +185,7 @@ public class UnsafeSortDataRows {
           }
           unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
           semaphore.acquire();
-          dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
+          dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage));
           MemoryBlock memoryBlock =
               UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize);
           boolean saveToDisk =
@@ -342,14 +341,15 @@ public class UnsafeSortDataRows {
    * This class is responsible for sorting and writing the object
    * array which holds the records equal to given array size
    */
-  private class DataSorterAndWriter implements Callable<Void> {
+  private class DataSorterAndWriter implements Runnable {
     private UnsafeCarbonRowPage page;
 
     public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) {
       this.page = rowPage;
     }
 
-    @Override public Void call() throws Exception {
+    @Override
+    public void run() {
       try {
         long startTime = System.currentTimeMillis();
         TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>(
@@ -399,11 +399,14 @@ public class UnsafeSortDataRows {
                   + (System.currentTimeMillis() - startTime));
         }
       } catch (Throwable e) {
-        threadStatusObserver.notifyFailed(e);
+        try {
+          threadStatusObserver.notifyFailed(e);
+        } catch (CarbonSortKeyAndGroupByException ex) {
+          LOGGER.error(e);
+        }
       } finally {
         semaphore.release();
       }
-      return null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 397de63..f00dd45 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -76,6 +76,23 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
     return comparator.compare(currentRow, o.getRow());
   }
 
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (!(obj instanceof UnsafeFinalMergePageHolder)) {
+      return false;
+    }
+
+    UnsafeFinalMergePageHolder o = (UnsafeFinalMergePageHolder) obj;
+    return this == o;
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
+
   public int numberOfRows() {
     return actualSize;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
index 048f4f8..20d9894 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -72,6 +72,24 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
     return comparator.compare(currentRow, o.getRow());
   }
 
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (!(obj instanceof UnsafeInmemoryHolder)) {
+      return false;
+    }
+
+    UnsafeInmemoryHolder o = (UnsafeInmemoryHolder)obj;
+
+    return this == o;
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
+
   public int numberOfRows() {
     return actualSize;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
index 390dbf5..fa4534f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java
@@ -78,6 +78,23 @@ public class UnsafeInmemoryMergeHolder implements Comparable<UnsafeInmemoryMerge
     return comparator.compare(currentRow, baseObject, o.getRow(), o.getBaseObject());
   }
 
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if (!(obj instanceof UnsafeInmemoryMergeHolder)) {
+      return false;
+    }
+
+    UnsafeInmemoryMergeHolder o = (UnsafeInmemoryMergeHolder)obj;
+    return this == o;
+  }
+
+  @Override public int hashCode() {
+    return super.hashCode();
+  }
+
   public Object getBaseObject() {
     return baseObject;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 7fb9b6e..f5316e6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -344,11 +344,14 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
               stream.readFully(bigDecimalInBytes);
               row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
               break;
+            default:
+              throw new IllegalArgumentException("unsupported data type:" +
+                  measureDataType[mesCount]);
           }
         }
       }
       return row;
-    } catch (Exception e) {
+    } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException(e);
     }
   }
@@ -397,12 +400,16 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   }
 
   @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
     if (!(obj instanceof UnsafeSortTempFileChunkHolder)) {
       return false;
     }
     UnsafeSortTempFileChunkHolder o = (UnsafeSortTempFileChunkHolder) obj;
 
-    return o.compareTo(o) == 0;
+    return this == o;
   }
 
   @Override public int hashCode() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
index 6adbac8..5480838 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.processing.newflow.sort.unsafe.merger;
 
 import java.util.AbstractQueue;
 import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -28,7 +27,7 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonR
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryMergeHolder;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 
-public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> {
+public class UnsafeInMemoryIntermediateDataMerger implements Runnable {
   /**
    * LOGGER
    */
@@ -68,7 +67,8 @@ public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> {
     this.entryCount = 0;
   }
 
-  @Override public Void call() throws Exception {
+  @Override
+  public void run() {
     long intermediateMergeStartTime = System.currentTimeMillis();
     int holderCounterConst = holderCounter;
     try {
@@ -83,7 +83,6 @@ public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> {
     } catch (Exception e) {
       LOGGER.error(e, "Problem while intermediate merging");
     }
-    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index c67e093..63f6aab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
 import java.util.AbstractQueue;
 import java.util.Arrays;
 import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -41,7 +40,7 @@ import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriter;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriterFactory;
 
-public class UnsafeIntermediateFileMerger implements Callable<Void> {
+public class UnsafeIntermediateFileMerger implements Runnable {
   /**
    * LOGGER
    */
@@ -100,7 +99,8 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     rowData = ByteBuffer.allocate(2 * 1024 * 1024);
   }
 
-  @Override public Void call() throws Exception {
+  @Override
+  public void run() {
     long intermediateMergeStartTime = System.currentTimeMillis();
     int fileConterConst = fileCounter;
     boolean isFailed = false;
@@ -134,8 +134,6 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
         }
       }
     }
-
-    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
index 93698ec..49791e8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -116,7 +116,7 @@ public class UnsafeIntermediateMerger {
             .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
     UnsafeIntermediateFileMerger merger =
         new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
-    executorService.submit(merger);
+    executorService.execute(merger);
   }
 
   public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException {
@@ -149,7 +149,7 @@ public class UnsafeIntermediateMerger {
     UnsafeInMemoryIntermediateDataMerger merger =
         new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows);
     mergedPages.add(merger);
-    executorService.submit(merger);
+    executorService.execute(merger);
   }
 
   private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> unsafeCarbonRowPages) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
deleted file mode 100644
index e1c74a8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.steps;
-
-import java.util.Iterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-
-/**
- * DummyClassForTest
- */
-public class DummyClassForTest extends AbstractDataLoadProcessorStep {
-
-  private ExecutorService executorService;
-
-  public DummyClassForTest(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child) {
-    super(configuration, child);
-  }
-
-  @Override public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
-  @Override public void initialize() throws CarbonDataLoadingException {
-
-  }
-
-  @Override protected String getStepName() {
-    return "Dummy";
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
-    Iterator<CarbonRowBatch>[] iterators = child.execute();
-    this.executorService = Executors.newFixedThreadPool(iterators.length);
-
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.submit(new DummyThread(iterators[i]));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
-    }
-    return null;
-  }
-
-  @Override protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-}
-
-/**
- * This thread iterates the iterator
- */
-class DummyThread implements Callable<Void> {
-
-  private Iterator<CarbonRowBatch> iterator;
-
-  public DummyThread(Iterator<CarbonRowBatch> iterator) {
-    this.iterator = iterator;
-  }
-
-  @Override public Void call() throws CarbonDataLoadingException {
-    try {
-      while (iterator.hasNext()) {
-        CarbonRowBatch batch = iterator.next();
-        while (batch.hasNext()) {
-          CarbonRow row = batch.next();
-          // do nothing
-        }
-      }
-
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 653da7b..7c6a889 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -25,7 +25,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.AbstractQueue;
 import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -34,7 +33,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 
-public class IntermediateFileMerger implements Callable<Void> {
+public class IntermediateFileMerger implements Runnable {
   /**
    * LOGGER
    */
@@ -101,7 +100,8 @@ public class IntermediateFileMerger implements Callable<Void> {
     noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
   }
 
-  @Override public Void call() throws Exception {
+  @Override
+  public void run() {
     long intermediateMergeStartTime = System.currentTimeMillis();
     int fileConterConst = fileCounter;
     boolean isFailed = false;
@@ -148,8 +148,6 @@ public class IntermediateFileMerger implements Callable<Void> {
         }
       }
     }
-
-    return null;
   }
 
   /**
@@ -358,6 +356,8 @@ public class IntermediateFileMerger implements Callable<Void> {
               stream.writeInt(bigDecimalInBytes.length);
               stream.write(bigDecimalInBytes);
               break;
+            default:
+              throw new IllegalArgumentException("unsupported data type:" + aggType[counter]);
           }
         } else {
           stream.write((byte) 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index fc575b6..71fc564 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Random;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
@@ -127,7 +126,7 @@ public class SortDataRows {
       Object[][] recordHolderListLocal = recordHolderList;
       try {
         semaphore.acquire();
-        dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(recordHolderListLocal));
+        dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
       } catch (InterruptedException e) {
         LOGGER.error(
             "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -163,7 +162,8 @@ public class SortDataRows {
         }
         try {
           semaphore.acquire();
-          dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(recordHolderListLocal));
+          dataSorterAndWriterExecutorService
+              .execute(new DataSorterAndWriter(recordHolderListLocal));
         } catch (Exception e) {
           LOGGER.error(
               "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -310,6 +310,8 @@ public class SortDataRows {
                 stream.writeInt(bigDecimalInBytes.length);
                 stream.write(bigDecimalInBytes);
                 break;
+              default:
+                throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
             }
           } else {
             stream.write((byte) 0);
@@ -389,14 +391,15 @@ public class SortDataRows {
    * This class is responsible for sorting and writing the object
    * array which holds the records equal to given array size
    */
-  private class DataSorterAndWriter implements Callable<Void> {
+  private class DataSorterAndWriter implements Runnable {
     private Object[][] recordHolderArray;
 
     public DataSorterAndWriter(Object[][] recordHolderArray) {
       this.recordHolderArray = recordHolderArray;
     }
 
-    @Override public Void call() throws Exception {
+    @Override
+    public void run() {
       try {
         long startTime = System.currentTimeMillis();
         if (parameters.getNumberOfNoDictSortColumns() > 0) {
@@ -420,11 +423,14 @@ public class SortDataRows {
         LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
             System.currentTimeMillis() - startTime));
       } catch (Throwable e) {
-        threadStatusObserver.notifyFailed(e);
+        try {
+          threadStatusObserver.notifyFailed(e);
+        } catch (CarbonSortKeyAndGroupByException ex) {
+          LOGGER.error(ex);
+        }
       } finally {
         semaphore.release();
       }
-      return null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
index 383498c..6bda88a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java
@@ -91,7 +91,7 @@ public class SortIntermediateFileMerger {
         chosenTempDir + File.separator + parameters.getTableName() + System
             .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
     IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
-    executorService.submit(merger);
+    executorService.execute(merger);
   }
 
   public void finish() throws CarbonSortKeyAndGroupByException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index 9732e66..10b3ad5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -359,6 +359,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
               stream.readFully(buff);
               measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
               break;
+            default:
+              throw new IllegalArgumentException("unsupported data type:" + aggType[i]);
           }
         } else {
           measures[index++] = null;
@@ -448,14 +450,16 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   }
 
   @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
     if (!(obj instanceof SortTempFileChunkHolder)) {
       return false;
     }
     SortTempFileChunkHolder o = (SortTempFileChunkHolder) obj;
 
-
-
-    return o.compareTo(o) == 0;
+    return this == o;
   }
 
   @Override public int hashCode() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index a716340..251b62e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -190,14 +190,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       }
     }
     this.version = CarbonProperties.getInstance().getFormatVersion();
-    String noInvertedIdxCol = "";
+    StringBuffer noInvertedIdxCol = new StringBuffer();
     for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) {
       if (!cd.isUseInvertedIndex()) {
-        noInvertedIdxCol += (cd.getColName() + ",");
+        noInvertedIdxCol.append(cd.getColName()).append(",");
       }
     }
 
-    LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol);
+    LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol.toString());
   }
 
   private void initParameters(CarbonFactDataHandlerModel model) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 6178cfb..48227d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -198,27 +197,28 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
     for (final File tempFile : files) {
 
-      Callable<Void> runnable = new Callable<Void>() {
-        @Override public Void call() throws CarbonSortKeyAndGroupByException {
-          // create chunk holder
-          SortTempFileChunkHolder sortTempFileChunkHolder =
-              new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
-                  measureCount, fileBufferSize, noDictionaryCount, measureDataType,
-                  isNoDictionaryColumn, isNoDictionarySortColumn);
-
-          // initialize
-          sortTempFileChunkHolder.initialize();
-          sortTempFileChunkHolder.readRow();
+      Runnable runnable = new Runnable() {
+        @Override public void run() {
+
+            // create chunk holder
+            SortTempFileChunkHolder sortTempFileChunkHolder =
+                new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
+                    measureCount, fileBufferSize, noDictionaryCount, measureDataType,
+                    isNoDictionaryColumn, isNoDictionarySortColumn);
+          try {
+            // initialize
+            sortTempFileChunkHolder.initialize();
+            sortTempFileChunkHolder.readRow();
+          } catch (CarbonSortKeyAndGroupByException ex) {
+            LOGGER.error(ex);
+          }
 
           synchronized (LOCKOBJECT) {
             recordHolderHeapLocal.add(sortTempFileChunkHolder);
           }
-
-          // add to heap
-          return null;
         }
       };
-      service.submit(runnable);
+      service.execute(runnable);
     }
     service.shutdown();
 


Mime
View raw message