carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/5] incubator-carbondata git commit: Data load integration of all steps for removing kettle
Date Thu, 10 Nov 2016 14:49:23 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master ae9d88b6c -> e30853975


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index a55e1ba..3a6afc7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -151,11 +151,19 @@ public class SortDataRows {
       toSort = new Object[entryCount][];
       System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
 
-      if (parameters.getNoDictionaryCount() > 0) {
-        Arrays.sort(toSort, new RowComparator(parameters.getNoDictionaryDimnesionColumn(),
-            parameters.getNoDictionaryCount()));
+      if (parameters.isUseKettle()) {
+        if (parameters.getNoDictionaryCount() > 0) {
+          Arrays.sort(toSort, new RowComparator(parameters.getNoDictionaryDimnesionColumn(),
+              parameters.getNoDictionaryCount()));
+        } else {
+          Arrays.sort(toSort, new RowComparatorForNormalDims(parameters.getDimColCount()));
+        }
       } else {
-        Arrays.sort(toSort, new RowComparatorForNormalDims(parameters.getDimColCount()));
+        if (parameters.getNoDictionaryCount() > 0) {
+          Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
+        } else {
+          Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getDimColCount()));
+        }
       }
       recordHolderList = toSort;
 
@@ -183,7 +191,11 @@ public class SortDataRows {
       writeSortTempFile(recordHolderList, entryCountLocal, file);
       return;
     }
-    writeData(recordHolderList, entryCountLocal, file);
+    if (parameters.isUseKettle()) {
+      writeData(recordHolderList, entryCountLocal, file);
+    } else {
+      writeDataWithOutKettle(recordHolderList, entryCountLocal, file);
+    }
   }
 
   private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file)
@@ -204,6 +216,7 @@ public class SortDataRows {
     }
   }
 
+  // TODO Remove it after kettle got removed
   private void writeData(Object[][] recordHolderList, int entryCountLocal, File file)
       throws CarbonSortKeyAndGroupByException {
     DataOutputStream stream = null;
@@ -264,6 +277,72 @@ public class SortDataRows {
     }
   }
 
+  private void writeDataWithOutKettle(Object[][] recordHolderList, int entryCountLocal, File file)
+      throws CarbonSortKeyAndGroupByException {
+    DataOutputStream stream = null;
+    try {
+      // open stream
+      stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file),
+          parameters.getFileWriteBufferSize()));
+
+      // write number of entries to the file
+      stream.writeInt(entryCountLocal);
+      int complexDimColCount = parameters.getComplexDimColCount();
+      int dimColCount = parameters.getDimColCount() + complexDimColCount;
+      char[] aggType = parameters.getAggType();
+      boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
+      Object[] row = null;
+      for (int i = 0; i < entryCountLocal; i++) {
+        // get row from record holder list
+        row = recordHolderList[i];
+        int dimCount = 0;
+        // write dictionary and non dictionary dimensions here.
+        for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) {
+          if (noDictionaryDimnesionMapping[dimCount]) {
+            byte[] col = (byte[]) row[dimCount];
+            stream.writeShort(col.length);
+            stream.write(col);
+          } else {
+            stream.writeInt((int)row[dimCount]);
+          }
+        }
+        // write complex dimensions here.
+        for (; dimCount < dimColCount; dimCount++) {
+          byte[] value = (byte[])row[dimCount];
+          stream.writeShort(value.length);
+          stream.write(value);
+        }
+        // as measures are stored in separate array.
+        for (int mesCount = 0;
+             mesCount < parameters.getMeasureColCount(); mesCount++) {
+          Object value = row[mesCount + dimColCount];
+          if (null != value) {
+            stream.write((byte) 1);
+            if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+              Double val = (Double) value;
+              stream.writeDouble(val);
+            } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
+              Long val = (Long) value;
+              stream.writeLong(val);
+            } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+              BigDecimal val = (BigDecimal) value;
+              byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+              stream.writeInt(bigDecimalInBytes.length);
+              stream.write(bigDecimalInBytes);
+            }
+          } else {
+            stream.write((byte) 0);
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+    } finally {
+      // close streams
+      CarbonUtil.closeStreams(stream);
+    }
+  }
+
   private TempSortFileWriter getWriter() {
     TempSortFileWriter chunkWriter = null;
     TempSortFileWriter writer = TempSortFileWriterFactory.getInstance()
@@ -339,14 +418,25 @@ public class SortDataRows {
     @Override public Void call() throws Exception {
       try {
         long startTime = System.currentTimeMillis();
-        if (parameters.getNoDictionaryCount() > 0) {
-          Arrays.sort(recordHolderArray,
-              new RowComparator(parameters.getNoDictionaryDimnesionColumn(),
-                  parameters.getNoDictionaryCount()));
+        if (parameters.isUseKettle()) {
+          if (parameters.getNoDictionaryCount() > 0) {
+            Arrays.sort(recordHolderArray,
+                new RowComparator(parameters.getNoDictionaryDimnesionColumn(),
+                    parameters.getNoDictionaryCount()));
+          } else {
+            Arrays.sort(recordHolderArray,
+                new RowComparatorForNormalDims(parameters.getDimColCount()));
+          }
         } else {
-          Arrays
-              .sort(recordHolderArray, new RowComparatorForNormalDims(parameters.getDimColCount()));
+          if (parameters.getNoDictionaryCount() > 0) {
+            Arrays.sort(recordHolderArray,
+                new NewRowComparator(parameters.getNoDictionaryDimnesionColumn()));
+          } else {
+            Arrays.sort(recordHolderArray,
+                new NewRowComparatorForNormalDims(parameters.getDimColCount()));
+          }
         }
+
         // create a new file every time
         File sortTempFile = new File(
             parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index e7bc252..6254cce 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -19,18 +19,12 @@
 package org.apache.carbondata.processing.sortandgroupby.sortdata;
 
 import java.io.File;
-import java.util.Arrays;
-import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.schema.metadata.SortObserver;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -122,6 +116,11 @@ public class SortParameters {
 
   private int numberOfCores;
 
+  /**
+   * TODO Temporary conf , it will be removed after kettle removal.
+   */
+  private boolean useKettle = true;
+
   public String getTempFileLocation() {
     return tempFileLocation;
   }
@@ -298,6 +297,14 @@ public class SortParameters {
     this.numberOfCores = numberOfCores;
   }
 
+  public boolean isUseKettle() {
+    return useKettle;
+  }
+
+  public void setUseKettle(boolean useKettle) {
+    this.useKettle = useKettle;
+  }
+
   public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
     SortParameters parameters = new SortParameters();
     CarbonTableIdentifier tableIdentifier =
@@ -394,16 +401,11 @@ public class SortParameters {
     parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
     parameters.setBufferSize(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
 
-    char[] aggType = new char[parameters.getMeasureColCount()];
-    Arrays.fill(aggType, 'n');
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        parameters.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + parameters
-            .getTableName());
-    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(parameters.getTableName());
-    for (int i = 0; i < aggType.length; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
-    }
+    char[] aggType = CarbonDataProcessorUtil
+        .getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
+            parameters.getTableName());
     parameters.setAggType(aggType);
+    parameters.setUseKettle(false);
     return parameters;
   }
 
@@ -500,15 +502,9 @@ public class SortParameters {
     parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE);
     parameters.setBufferSize(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
 
-    char[] aggType = new char[parameters.getMeasureColCount()];
-    Arrays.fill(aggType, 'n');
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        parameters.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + parameters
-            .getTableName());
-    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(parameters.getTableName());
-    for (int i = 0; i < aggType.length; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
-    }
+    char[] aggType = CarbonDataProcessorUtil
+        .getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
+            parameters.getTableName());
     parameters.setAggType(aggType);
     return parameters;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index a594353..ea939a8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -136,6 +136,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    */
   private boolean[] isNoDictionaryDimensionColumn;
 
+  // TODO temporary configuration, remove after kettle removal
+  private boolean useKettle;
+
   /**
    * Constructor to initialize
    *
@@ -150,7 +153,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    */
   public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
       int measureCount, int fileBufferSize, int noDictionaryCount, char[] aggType,
-      boolean[] isNoDictionaryDimensionColumn) {
+      boolean[] isNoDictionaryDimensionColumn, boolean useKettle) {
     // set temp file
     this.tempFile = tempFile;
 
@@ -165,6 +168,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     this.executorService = Executors.newFixedThreadPool(1);
     this.aggType = aggType;
     this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn;
+    this.useKettle = useKettle;
   }
 
   /**
@@ -302,7 +306,15 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    */
   private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
     // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+    if (useKettle) {
+      return getRowFromStreamWithKettle();
+    } else {
+      return getRowFromStreamWithOutKettle();
+    }
+  }
 
+  // TODO remove after kettle flow is removed
+  private Object[] getRowFromStreamWithKettle() throws CarbonSortKeyAndGroupByException {
     Object[] holder = new Object[3];
     int index = 0;
     Integer[] dim = new Integer[this.dimensionCount];
@@ -361,6 +373,72 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   }
 
   /**
+   * Reads row from file
+   * @return Object[]
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] getRowFromStreamWithOutKettle() throws CarbonSortKeyAndGroupByException {
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+
+    Object[] holder = new Object[3];
+    int index = 0;
+    int nonDicIndex = 0;
+    int[] dim = new int[this.dimensionCount];
+    byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][];
+    Object[] measures = new Object[this.measureCount];
+    try {
+      // read dimension values
+      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+        if (isNoDictionaryDimensionColumn[i]) {
+          short len = stream.readShort();
+          byte[] array = new byte[len];
+          stream.readFully(array);
+          nonDicArray[nonDicIndex++] = array;
+        } else {
+          dim[index++] = stream.readInt();
+        }
+      }
+
+      for (int i = 0; i < complexDimensionCount; i++) {
+        short len = stream.readShort();
+        byte[] array = new byte[len];
+        stream.readFully(array);
+        nonDicArray[nonDicIndex++] = array;
+      }
+
+      index = 0;
+      // read measure values
+      for (int i = 0; i < this.measureCount; i++) {
+        if (stream.readByte() == 1) {
+          if (aggType[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+            measures[index++] = stream.readDouble();
+          } else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
+            measures[index++] = stream.readLong();
+          } else {
+            int len = stream.readInt();
+            byte[] buff = new byte[len];
+            stream.readFully(buff);
+            measures[index++] = buff;
+          }
+        } else {
+          measures[index++] = null;
+        }
+      }
+
+      RemoveDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+
+      // increment number if record read
+      this.numberOfObjectRead++;
+    } catch (IOException e) {
+      LOGGER.error("Problme while reading the madkey fom sort temp file");
+      throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
+    }
+
+    //return out row
+    return holder;
+  }
+
+  /**
    * below method will be used to get the row
    *
    * @return row
@@ -400,11 +478,16 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   }
 
   @Override public int compareTo(SortTempFileChunkHolder other) {
+    if (useKettle) {
+      return compareWithKettle(other);
 
-    return compare(other);
+    } else {
+      return compareWithOutKettle(other);
+    }
   }
 
-  private int compare(SortTempFileChunkHolder other) {
+  // TODO Remove after kettle flow is removed.
+  private int compareWithKettle(SortTempFileChunkHolder other) {
     int diff = 0;
 
     int normalIndex = 0;
@@ -448,6 +531,34 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     return diff;
   }
 
+  private int compareWithOutKettle(SortTempFileChunkHolder other) {
+    int diff = 0;
+    int index = 0;
+    int noDictionaryIndex = 0;
+    int[] leftMdkArray = (int[]) returnRow[0];
+    int[] rightMdkArray = (int[]) other.returnRow[0];
+    byte[][] leftNonDictArray = (byte[][]) returnRow[1];
+    byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
+    for (boolean isNoDictionary : isNoDictionaryDimensionColumn) {
+      if (isNoDictionary) {
+        diff = UnsafeComparer.INSTANCE
+            .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
+        if (diff != 0) {
+          return diff;
+        }
+        noDictionaryIndex++;
+      } else {
+        diff = leftMdkArray[index] - rightMdkArray[index];
+        if (diff != 0) {
+          return diff;
+        }
+        index++;
+      }
+
+    }
+    return diff;
+  }
+
   @Override public boolean equals(Object obj) {
     if (!(obj instanceof SortTempFileChunkHolder)) {
       return false;
@@ -456,7 +567,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
 
 
 
-    return o.compare(o) == 0;
+    return o.compareTo(o) == 0;
   }
 
   @Override public int hashCode() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 8dd5ce2..f3bd484 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -271,6 +271,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private boolean compactionFlow;
 
+  private boolean useKettle;
+
   /**
    * CarbonFactDataHandler constructor
    */
@@ -279,6 +281,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dimensionCount = carbonFactDataHandlerModel.getDimensionCount();
     this.complexIndexMap = carbonFactDataHandlerModel.getComplexIndexMap();
     this.primitiveDimLens = carbonFactDataHandlerModel.getPrimitiveDimLens();
+    this.useKettle = carbonFactDataHandlerModel.isUseKettle();
     this.isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
             CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
@@ -470,11 +473,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         this.entryCount = 0;
       } catch (InterruptedException e) {
         LOGGER.error(e, e.getMessage());
-        throw new CarbonDataWriterException(e.getMessage());
+        throw new CarbonDataWriterException(e.getMessage(), e);
       }
     }
   }
 
+  // TODO remove after kettle flow is removed
   private NodeHolder processDataRows(List<Object[]> dataRows) throws CarbonDataWriterException {
     Object[] max = new Object[measureCount];
     Object[] min = new Object[measureCount];
@@ -611,6 +615,143 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return nodeHolder;
   }
 
+  private NodeHolder processDataRowsWithOutKettle(List<Object[]> dataRows)
+      throws CarbonDataWriterException {
+    Object[] max = new Object[measureCount];
+    Object[] min = new Object[measureCount];
+    int[] decimal = new int[measureCount];
+    Object[] uniqueValue = new Object[measureCount];
+    // to store index of the measure columns which are null
+    BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
+    for (int i = 0; i < max.length; i++) {
+      if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
+        max[i] = Long.MIN_VALUE;
+      } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+        max[i] = -Double.MAX_VALUE;
+      } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+        max[i] = new BigDecimal(0.0);
+      } else {
+        max[i] = 0.0;
+      }
+    }
+    for (int i = 0; i < min.length; i++) {
+      if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
+        min[i] = Long.MAX_VALUE;
+        uniqueValue[i] = Long.MIN_VALUE;
+      } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+        min[i] = Double.MAX_VALUE;
+        uniqueValue[i] = Double.MIN_VALUE;
+      } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+        min[i] = new BigDecimal(Double.MAX_VALUE);
+        uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
+      } else {
+        min[i] = 0.0;
+        uniqueValue[i] = 0.0;
+      }
+    }
+    for (int i = 0; i < decimal.length; i++) {
+      decimal[i] = 0;
+    }
+
+    byte[] startKey = null;
+    byte[] endKey = null;
+    byte[][] noDictStartKey = null;
+    byte[][] noDictEndKey = null;
+    CarbonWriteDataHolder[] dataHolder = initialiseDataHolder(dataRows.size());
+    CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolderWithOutKettle(dataRows.size());
+    CarbonWriteDataHolder noDictionaryKeyDataHolder = null;
+    if ((noDictionaryCount + complexColCount) > 0) {
+      noDictionaryKeyDataHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
+    }
+
+    for (int count = 0; count < dataRows.size(); count++) {
+      Object[] row = dataRows.get(count);
+      byte[] mdKey = (byte[]) row[this.mdKeyIndex];
+      byte[][] noDictionaryKey = null;
+      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
+        noDictionaryKey = (byte[][]) row[this.mdKeyIndex - 1];
+      }
+      ByteBuffer byteBuffer = null;
+      byte[] b = null;
+      if (count == 0) {
+        startKey = mdKey;
+        noDictStartKey = noDictionaryKey;
+      }
+      endKey = mdKey;
+      noDictEndKey = noDictionaryKey;
+      // add to key store
+      if (mdKey.length > 0) {
+        keyDataHolder.setWritableByteArrayValueByIndex(count, mdKey);
+      }
+      // for storing the byte [] for high card.
+      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
+        noDictionaryKeyDataHolder.setWritableNonDictByteArrayValueByIndex(count, noDictionaryKey);
+      }
+
+      for (int k = 0; k < otherMeasureIndex.length; k++) {
+        if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
+          if (null == row[otherMeasureIndex[k]]) {
+            nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
+            dataHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
+          } else {
+            dataHolder[otherMeasureIndex[k]]
+                .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
+          }
+        } else {
+          if (null == row[otherMeasureIndex[k]]) {
+            nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
+            dataHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
+          } else {
+            dataHolder[otherMeasureIndex[k]]
+                .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
+          }
+        }
+      }
+      calculateMaxMin(max, min, decimal, otherMeasureIndex, row);
+      for (int i = 0; i < customMeasureIndex.length; i++) {
+        if (null == row[customMeasureIndex[i]]
+            && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+          BigDecimal val = BigDecimal.valueOf(0);
+          b = DataTypeUtil.bigDecimalToByte(val);
+          nullValueIndexBitSet[customMeasureIndex[i]].set(count);
+        } else {
+          if (this.compactionFlow) {
+            BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
+            b = DataTypeUtil.bigDecimalToByte(bigDecimal);
+          } else {
+            b = (byte[]) row[customMeasureIndex[i]];
+          }
+        }
+        byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
+        byteBuffer.putInt(b.length);
+        byteBuffer.put(b);
+        byteBuffer.flip();
+        b = byteBuffer.array();
+        dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b);
+      }
+      calculateMaxMin(max, min, decimal, customMeasureIndex, row);
+    }
+    calculateUniqueValue(min, uniqueValue);
+    byte[][] byteArrayValues = keyDataHolder.getByteArrayValues().clone();
+    byte[][][] noDictionaryValueHolder = null;
+    if ((noDictionaryCount + complexColCount) > 0) {
+      noDictionaryValueHolder = noDictionaryKeyDataHolder.getNonDictByteArrayValues();
+    }
+    ValueCompressionModel compressionModel = ValueCompressionUtil
+        .getValueCompressionModel(max, min, decimal, uniqueValue, type, new byte[max.length]);
+    byte[][] writableMeasureDataArray =
+        StoreFactory.createDataStore(compressionModel).getWritableMeasureDataArray(dataHolder)
+            .clone();
+    NodeHolder nodeHolder =
+        getNodeHolderObjectWithOutKettle(writableMeasureDataArray, byteArrayValues, dataRows.size(),
+            startKey, endKey, compressionModel, noDictionaryValueHolder, noDictStartKey,
+            noDictEndKey);
+    nodeHolder.setMeasureNullValueIndex(nullValueIndexBitSet);
+    LOGGER.info("Number Of records processed: " + dataRows.size());
+    return nodeHolder;
+  }
+
+  // TODO remove after kettle flow is removed
   private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, byte[][] byteArrayValues,
       int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
       ValueCompressionModel compressionModel, byte[][] noDictionaryData,
@@ -738,6 +879,142 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             endKeyLocal, compressionModel, noDictionaryStartKey, noDictionaryEndKey);
   }
 
+  private NodeHolder getNodeHolderObjectWithOutKettle(byte[][] dataHolderLocal,
+      byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
+      ValueCompressionModel compressionModel, byte[][][] noDictionaryData,
+      byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey)
+      throws CarbonDataWriterException {
+    byte[][][] noDictionaryColumnsData = null;
+    List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
+    int complexColCount = getComplexColsCount();
+
+    for (int i = 0; i < complexColCount; i++) {
+      colsAndValues.add(new ArrayList<byte[]>());
+    }
+    int noOfColumn = colGrpModel.getNoOfColumnStore();
+    DataHolder[] dataHolders = getDataHolders(noOfColumn, byteArrayValues.length);
+    for (int i = 0; i < byteArrayValues.length; i++) {
+      byte[][] splitKey = columnarSplitter.splitKey(byteArrayValues[i]);
+
+      for (int j = 0; j < splitKey.length; j++) {
+        dataHolders[j].addData(splitKey[j], i);
+      }
+    }
+    if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
+      noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryData.length][];
+      for (int i = 0; i < noDictionaryData.length; i++) {
+        int complexColumnIndex = primitiveDimLens.length + noDictionaryCount;
+        byte[][] splitKey = noDictionaryData[i];
+
+        int complexTypeIndex = 0;
+        for (int j = 0; j < splitKey.length; j++) {
+          //nodictionary Columns
+          if (j < noDictionaryCount) {
+            int keyLength = splitKey[j].length;
+            byte[] newKey = new byte[keyLength + 2];
+            ByteBuffer buffer = ByteBuffer.wrap(newKey);
+            buffer.putShort((short)keyLength);
+            System.arraycopy(splitKey[j], 0, newKey, 2, keyLength);
+            noDictionaryColumnsData[j][i] = newKey;
+          }
+          //complex types
+          else {
+            // Need to write columnar block from complex byte array
+            int index = complexColumnIndex - noDictionaryCount;
+            GenericDataType complexDataType = complexIndexMap.get(index);
+            complexColumnIndex++;
+            if (complexDataType != null) {
+              List<ArrayList<byte[]>> columnsArray = new ArrayList<ArrayList<byte[]>>();
+              for (int k = 0; k < complexDataType.getColsCount(); k++) {
+                columnsArray.add(new ArrayList<byte[]>());
+              }
+
+              try {
+                ByteBuffer byteArrayInput = ByteBuffer.wrap(splitKey[j]);
+                ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
+                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
+                complexDataType
+                    .parseAndBitPack(byteArrayInput, dataOutputStream, this.complexKeyGenerator);
+                complexDataType.getColumnarDataForComplexType(columnsArray,
+                    ByteBuffer.wrap(byteArrayOutput.toByteArray()));
+                byteArrayOutput.close();
+              } catch (IOException e) {
+                throw new CarbonDataWriterException(
+                    "Problem while bit packing and writing complex datatype", e);
+              } catch (KeyGenException e) {
+                throw new CarbonDataWriterException(
+                    "Problem while bit packing and writing complex datatype", e);
+              }
+
+              for (ArrayList<byte[]> eachColumn : columnsArray) {
+                colsAndValues.get(complexTypeIndex++).addAll(eachColumn);
+              }
+            } else {
+              // This case not possible as ComplexType is the last columns
+            }
+          }
+        }
+      }
+    }
+    thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
+            CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
+    ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
+    List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
+        primitiveDimLens.length + noDictionaryCount + complexColCount);
+    int i = 0;
+    int dictionaryColumnCount = -1;
+    int noDictionaryColumnCount = -1;
+    for (i = 0; i < dimensionType.length; i++) {
+      if (dimensionType[i]) {
+        dictionaryColumnCount++;
+        if (colGrpModel.isColumnar(dictionaryColumnCount)) {
+          submit.add(executorService
+              .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
+                  true, isUseInvertedIndex[i])));
+        } else {
+          submit.add(
+              executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
+        }
+      } else {
+        submit.add(executorService.submit(
+            new BlockSortThread(i, noDictionaryColumnsData[++noDictionaryColumnCount], false, true,
+                true, isUseInvertedIndex[i])));
+      }
+    }
+    for (int k = 0; k < complexColCount; k++) {
+      submit.add(executorService.submit(new BlockSortThread(i++,
+          colsAndValues.get(k).toArray(new byte[colsAndValues.get(k).size()][]), false, true)));
+    }
+    executorService.shutdown();
+    try {
+      executorService.awaitTermination(1, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    IndexStorage[] blockStorage =
+        new IndexStorage[colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount];
+    try {
+      for (int k = 0; k < blockStorage.length; k++) {
+        blockStorage[k] = submit.get(k).get();
+      }
+    } catch (Exception e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    byte[] composedNonDictStartKey = null;
+    byte[] composedNonDictEndKey = null;
+    if (noDictionaryStartKey != null) {
+      composedNonDictStartKey =
+          RemoveDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryStartKey);
+      composedNonDictEndKey =
+          RemoveDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryEndKey);
+    }
+    return this.dataWriter
+        .buildDataNodeHolder(blockStorage, dataHolderLocal, entryCountLocal, startkeyLocal,
+            endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey);
+  }
+
+
   /**
    * DataHolder will have all row mdkey data
    *
@@ -812,10 +1089,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         taskList.get(i).get();
       } catch (InterruptedException e) {
         LOGGER.error(e, e.getMessage());
-        throw new CarbonDataWriterException(e.getMessage());
+        throw new CarbonDataWriterException(e.getMessage(), e);
       } catch (ExecutionException e) {
         LOGGER.error(e, e.getMessage());
-        throw new CarbonDataWriterException(e.getMessage());
+        throw new CarbonDataWriterException(e.getMessage(), e);
       }
     }
   }
@@ -883,7 +1160,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   /**
-   * @param double value
+   * @param value
    * @return it return no of value after decimal
    */
   private int getDecimalCount(double value) {
@@ -1065,12 +1342,25 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return blockKeySizeWithComplexTypes;
   }
 
+  // TODO Remove after kettle flow got removed.
   private CarbonWriteDataHolder initialiseKeyBlockHolder(int size) {
     CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
     keyDataHolder.initialiseByteArrayValues(size);
     return keyDataHolder;
   }
 
+  private CarbonWriteDataHolder initialiseKeyBlockHolderWithOutKettle(int size) {
+    CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
+    keyDataHolder.initialiseByteArrayValuesWithOutKettle(size);
+    return keyDataHolder;
+  }
+
+  private CarbonWriteDataHolder initialiseKeyBlockHolderForNonDictionary(int size) {
+    CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
+    keyDataHolder.initialiseByteArrayValuesForNonDictionary(size);
+    return keyDataHolder;
+  }
+
   private CarbonWriteDataHolder[] initialiseDataHolder(int size) {
     CarbonWriteDataHolder[] dataHolder = new CarbonWriteDataHolder[this.measureCount];
     for (int i = 0; i < otherMeasureIndex.length; i++) {
@@ -1233,7 +1523,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     @Override public Void call() throws Exception {
       try {
-        NodeHolder nodeHolder = processDataRows(dataRows);
+        NodeHolder nodeHolder;
+        if (useKettle) {
+          nodeHolder = processDataRows(dataRows);
+        } else {
+          nodeHolder = processDataRowsWithOutKettle(dataRows);
+        }
         // insert the object in array according to sequence number
         int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
         blockletDataHolder.put(nodeHolder, indexInNodeHolderArray);
@@ -1241,7 +1536,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       } catch (Throwable throwable) {
         consumerExecutorService.shutdownNow();
         resetBlockletProcessingCount();
-        throw new CarbonDataWriterException(throwable.getMessage());
+        throw new CarbonDataWriterException(throwable.getMessage(), throwable);
       }
     }
   }
@@ -1276,7 +1571,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
           if (!processingComplete || blockletProcessingCount.get() > 0) {
             producerExecutorService.shutdownNow();
             resetBlockletProcessingCount();
-            throw new CarbonDataWriterException(throwable.getMessage());
+            throw new CarbonDataWriterException(throwable.getMessage(), throwable);
           }
         } finally {
           semaphore.release();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index eaf0192..c7d9d29 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -19,7 +19,11 @@
 
 package org.apache.carbondata.processing.store;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
@@ -178,15 +182,22 @@ public class CarbonFactDataHandlerModel {
   private boolean isCompactionFlow;
 
   /**
+   * To use kettle flow to load or not.
+   */
+  private boolean useKettle = true;
+
+  /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    * @param configuration
    * @return CarbonFactDataHandlerModel
    */
   public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
-      CarbonDataLoadConfiguration configuration) {
+      CarbonDataLoadConfiguration configuration, String storeLocation) {
 
-    CarbonTableIdentifier tableIdentifier =
+    CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
+    CarbonTableIdentifier tableIdentifier =
+        identifier;
     boolean[] isUseInvertedIndex =
         CarbonDataProcessorUtil.getIsUseInvertedIndex(configuration.getDataFields());
 
@@ -206,7 +217,7 @@ public class CarbonFactDataHandlerModel {
     int complexDimensionCount = configuration.getComplexDimensionCount();
     int measureCount = configuration.getMeasureCount();
 
-    int simpleDimsCount = dimensionCount - complexDimensionCount;
+    int simpleDimsCount = dimensionCount - noDictionaryCount - complexDimensionCount;
     int[] simpleDimsLen = new int[simpleDimsCount];
     for (int i = 0; i < simpleDimsCount; i++) {
       simpleDimsLen[i] = dimLens[i];
@@ -251,26 +262,30 @@ public class CarbonFactDataHandlerModel {
 
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
     carbonFactDataHandlerModel.setDatabaseName(
-        configuration.getTableIdentifier().getCarbonTableIdentifier().getDatabaseName());
+        identifier.getDatabaseName());
     carbonFactDataHandlerModel
-        .setTableName(configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName());
+        .setTableName(identifier.getTableName());
     carbonFactDataHandlerModel.setMeasureCount(measureCount);
     carbonFactDataHandlerModel.setMdKeyLength(keyGenerator.getKeySizeInBytes());
-    carbonFactDataHandlerModel.setStoreLocation(configuration.getTableIdentifier().getStorePath());
+    carbonFactDataHandlerModel.setStoreLocation(storeLocation);
     carbonFactDataHandlerModel.setDimLens(dimLens);
     carbonFactDataHandlerModel.setNoDictionaryCount(noDictionaryCount);
-    carbonFactDataHandlerModel.setDimensionCount(configuration.getDimensionCount());
+    carbonFactDataHandlerModel
+        .setDimensionCount(configuration.getDimensionCount() - noDictionaryCount);
     carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
     carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
     carbonFactDataHandlerModel.setColCardinality(colCardinality);
     carbonFactDataHandlerModel.setDataWritingRequest(true);
-    carbonFactDataHandlerModel.setAggType(null);
+    carbonFactDataHandlerModel.setAggType(CarbonDataProcessorUtil
+        .getAggType(measureCount, identifier.getDatabaseName(), identifier.getTableName()));
     carbonFactDataHandlerModel.setFactDimLens(dimLens);
     carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
     carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndex);
+    carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+    carbonFactDataHandlerModel.setUseKettle(false);
     if (noDictionaryCount > 0 || complexDimensionCount > 0) {
       carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
     } else {
@@ -536,5 +551,12 @@ public class CarbonFactDataHandlerModel {
     this.wrapperColumnSchema = wrapperColumnSchema;
   }
 
+  public boolean isUseKettle() {
+    return useKettle;
+  }
+
+  public void setUseKettle(boolean useKettle) {
+    this.useKettle = useKettle;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index f9c77fd..1579415 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -97,6 +97,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
   private char[] aggType;
 
+  private boolean useKettle;
+
   /**
    * below code is to check whether dimension
    * is of no dictionary type or not
@@ -105,7 +107,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
   public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName,
       int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
-      char[] aggType, boolean[] isNoDictionaryColumn) {
+      char[] aggType, boolean[] isNoDictionaryColumn, boolean useKettle) {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
     this.dimensionCount = dimensionCount;
@@ -114,6 +116,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     this.aggType = aggType;
     this.noDictionaryCount = noDictionaryCount;
     this.isNoDictionaryColumn = isNoDictionaryColumn;
+    this.useKettle = useKettle;
   }
 
   /**
@@ -178,7 +181,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
           // create chunk holder
           SortTempFileChunkHolder sortTempFileChunkHolder =
               new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
-                  measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn);
+                  measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn,
+                  useKettle);
 
           // initialize
           sortTempFileChunkHolder.initialize();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
new file mode 100644
index 0000000..bac2196
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public class BadRecordsLogger {
+
+  /**
+   * Comment for <code>LOGGER</code>
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BadRecordsLogger.class.getName());
+  /**
+   * Which holds the key and if any bad rec found to check from API to update
+   * the status
+   */
+  private static Map<String, String> badRecordEntry =
+      new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  /**
+   * File Name
+   */
+  private String fileName;
+  /**
+   * Store path
+   */
+  private String storePath;
+  /**
+   * FileChannel
+   */
+  private BufferedWriter bufferedWriter;
+  private DataOutputStream outStream;
+  /**
+   * csv file writer
+   */
+  private BufferedWriter bufferedCSVWriter;
+  private DataOutputStream outCSVStream;
+  /**
+   * bad record log file path
+   */
+  private String logFilePath;
+  /**
+   * csv file path
+   */
+  private String csvFilePath;
+
+  /**
+   * task key which is DatabaseName/TableName/tablename
+   */
+  private String taskKey;
+
+  private boolean badRecordsLogRedirect;
+
+  private boolean badRecordLoggerEnable;
+
+  private boolean badRecordConvertNullDisable;
+
+  // private final Object syncObject =new Object();
+
+  public BadRecordsLogger(String key, String fileName, String storePath,
+      boolean badRecordsLogRedirect, boolean badRecordLoggerEnable,
+      boolean badRecordConvertNullDisable) {
+    // Initially no bad rec
+    taskKey = key;
+    this.fileName = fileName;
+    this.storePath = storePath;
+    this.badRecordsLogRedirect = badRecordsLogRedirect;
+    this.badRecordLoggerEnable = badRecordLoggerEnable;
+    this.badRecordConvertNullDisable = badRecordConvertNullDisable;
+  }
+
+  /**
+   * @param key DatabaseName/TableName/tablename
+   * @return return "Partially" and remove from map
+   */
+  public static String hasBadRecord(String key) {
+    return badRecordEntry.remove(key);
+  }
+
+  public void addBadRecordsToBuilder(Object[] row, String reason) {
+    if (badRecordsLogRedirect || badRecordLoggerEnable) {
+      StringBuilder logStrings = new StringBuilder();
+      int size = row.length;
+      int count = size;
+      for (int i = 0; i < size; i++) {
+        if (null == row[i]) {
+          char ch =
+              logStrings.length() > 0 ? logStrings.charAt(logStrings.length() - 1) : (char) -1;
+          if (ch == ',') {
+            logStrings = logStrings.deleteCharAt(logStrings.lastIndexOf(","));
+          }
+          break;
+        } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(row[i].toString())) {
+          logStrings.append("null");
+        } else {
+          logStrings.append(row[i]);
+        }
+        if (count > 1) {
+          logStrings.append(',');
+        }
+        count--;
+      }
+      if (badRecordsLogRedirect) {
+        writeBadRecordsToCSVFile(logStrings);
+      }
+      if (badRecordLoggerEnable) {
+        logStrings.append("----->");
+        if (null != reason) {
+          if (reason.indexOf(CarbonCommonConstants.MEMBER_DEFAULT_VAL) > -1) {
+            logStrings
+                .append(reason.replace(CarbonCommonConstants.MEMBER_DEFAULT_VAL, "null"));
+          } else {
+            logStrings.append(reason);
+          }
+        }
+        writeBadRecordsToFile(logStrings);
+      }
+    } else {
+      // setting partial success entry since even if bad records are there then load
+      // status should be partial success regardless of bad record logged
+      badRecordEntry.put(taskKey, "Partially");
+    }
+  }
+
+  /**
+   *
+   */
+  private synchronized void writeBadRecordsToFile(StringBuilder logStrings) {
+    if (null == logFilePath) {
+      logFilePath =
+          this.storePath + File.separator + this.fileName + CarbonCommonConstants.LOG_FILE_EXTENSION
+              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    }
+    try {
+      if (null == bufferedWriter) {
+        FileType fileType = FileFactory.getFileType(storePath);
+        if (!FileFactory.isFileExist(this.storePath, fileType)) {
+          // create the folders if not exist
+          FileFactory.mkdirs(this.storePath, fileType);
+
+          // create the files
+          FileFactory.createNewFile(logFilePath, fileType);
+        }
+
+        outStream = FileFactory.getDataOutputStream(logFilePath, fileType);
+
+        bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream,
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      }
+      bufferedWriter.write(logStrings.toString());
+      bufferedWriter.newLine();
+    } catch (FileNotFoundException e) {
+      LOGGER.error("Bad Log Files not found");
+    } catch (IOException e) {
+      LOGGER.error("Error While writing bad log File");
+    } finally {
+      // if the Bad record file is created means it partially success
+      // if any entry present with key that means its have bad record for
+      // that key
+      badRecordEntry.put(taskKey, "Partially");
+    }
+  }
+
+  /**
+   * method will write the row having bad record in the csv file.
+   *
+   * @param logStrings
+   */
+  private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings) {
+    if (null == csvFilePath) {
+      csvFilePath =
+          this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION
+              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    }
+    try {
+      if (null == bufferedCSVWriter) {
+        FileType fileType = FileFactory.getFileType(storePath);
+        if (!FileFactory.isFileExist(this.storePath, fileType)) {
+          // create the folders if not exist
+          FileFactory.mkdirs(this.storePath, fileType);
+
+          // create the files
+          FileFactory.createNewFile(csvFilePath, fileType);
+        }
+
+        outCSVStream = FileFactory.getDataOutputStream(csvFilePath, fileType);
+
+        bufferedCSVWriter = new BufferedWriter(new OutputStreamWriter(outCSVStream,
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      }
+      bufferedCSVWriter.write(logStrings.toString());
+      bufferedCSVWriter.newLine();
+    } catch (FileNotFoundException e) {
+      LOGGER.error("Bad record csv Files not found");
+    } catch (IOException e) {
+      LOGGER.error("Error While writing bad record csv File");
+    }
+    finally {
+      badRecordEntry.put(taskKey, "Partially");
+    }
+  }
+
+  public boolean isBadRecordConvertNullDisable() {
+    return badRecordConvertNullDisable;
+  }
+
+  /**
+   * closeStreams void
+   */
+  public synchronized void closeStreams() {
+    CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
deleted file mode 100644
index ba33212..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-public class BadRecordslogger {
-
-  /**
-   * Comment for <code>LOGGER</code>
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(BadRecordslogger.class.getName());
-  /**
-   * Which holds the key and if any bad rec found to check from API to update
-   * the status
-   */
-  private static Map<String, String> badRecordEntry =
-      new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  /**
-   * File Name
-   */
-  private String fileName;
-  /**
-   * Store path
-   */
-  private String storePath;
-  /**
-   * FileChannel
-   */
-  private BufferedWriter bufferedWriter;
-  private DataOutputStream outStream;
-  /**
-   * csv file writer
-   */
-  private BufferedWriter bufferedCSVWriter;
-  private DataOutputStream outCSVStream;
-  /**
-   * bad record log file path
-   */
-  private String logFilePath;
-  /**
-   * csv file path
-   */
-  private String csvFilePath;
-
-  /**
-   * task key which is DatabaseName/TableName/tablename
-   */
-  private String taskKey;
-
-  // private final Object syncObject =new Object();
-
-  public BadRecordslogger(String key, String fileName, String storePath) {
-    // Initially no bad rec
-    taskKey = key;
-    this.fileName = fileName;
-    this.storePath = storePath;
-  }
-
-  /**
-   * @param key DatabaseName/TableName/tablename
-   * @return return "Partially" and remove from map
-   */
-  public static String hasBadRecord(String key) {
-    return badRecordEntry.remove(key);
-  }
-
-  public void addBadRecordsToBuilder(Object[] row, String reason, String valueComparer,
-      boolean badRecordsLogRedirect, boolean badRecordLoggerEnable) {
-    if (badRecordsLogRedirect || badRecordLoggerEnable) {
-      StringBuilder logStrings = new StringBuilder();
-      int size = row.length;
-      int count = size;
-      for (int i = 0; i < size; i++) {
-        if (null == row[i]) {
-          char ch =
-              logStrings.length() > 0 ? logStrings.charAt(logStrings.length() - 1) : (char) -1;
-          if (ch == ',') {
-            logStrings = logStrings.deleteCharAt(logStrings.lastIndexOf(","));
-          }
-          break;
-        } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(row[i].toString())) {
-          logStrings.append(valueComparer);
-        } else {
-          logStrings.append(row[i]);
-        }
-        if (count > 1) {
-          logStrings.append(',');
-        }
-        count--;
-      }
-      if (badRecordsLogRedirect) {
-        writeBadRecordsToCSVFile(logStrings);
-      }
-      if (badRecordLoggerEnable) {
-        logStrings.append("----->");
-        if (null != reason) {
-          if (reason.indexOf(CarbonCommonConstants.MEMBER_DEFAULT_VAL) > -1) {
-            logStrings
-                .append(reason.replace(CarbonCommonConstants.MEMBER_DEFAULT_VAL, valueComparer));
-          } else {
-            logStrings.append(reason);
-          }
-        }
-        writeBadRecordsToFile(logStrings);
-      }
-    } else {
-      // setting partial success entry since even if bad records are there then load
-      // status should be partial success regardless of bad record logged
-      badRecordEntry.put(taskKey, "Partially");
-    }
-  }
-
-  /**
-   *
-   */
-  private synchronized void writeBadRecordsToFile(StringBuilder logStrings) {
-    if (null == logFilePath) {
-      logFilePath =
-          this.storePath + File.separator + this.fileName + CarbonCommonConstants.LOG_FILE_EXTENSION
-              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-    }
-    try {
-      if (null == bufferedWriter) {
-        FileType fileType = FileFactory.getFileType(storePath);
-        if (!FileFactory.isFileExist(this.storePath, fileType)) {
-          // create the folders if not exist
-          FileFactory.mkdirs(this.storePath, fileType);
-
-          // create the files
-          FileFactory.createNewFile(logFilePath, fileType);
-        }
-
-        outStream = FileFactory.getDataOutputStream(logFilePath, fileType);
-
-        bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream,
-            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      }
-      bufferedWriter.write(logStrings.toString());
-      bufferedWriter.newLine();
-    } catch (FileNotFoundException e) {
-      LOGGER.error("Bad Log Files not found");
-    } catch (IOException e) {
-      LOGGER.error("Error While writing bad log File");
-    } finally {
-      // if the Bad record file is created means it partially success
-      // if any entry present with key that means its have bad record for
-      // that key
-      badRecordEntry.put(taskKey, "Partially");
-    }
-  }
-
-  /**
-   * method will write the row having bad record in the csv file.
-   *
-   * @param logStrings
-   */
-  private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings) {
-    if (null == csvFilePath) {
-      csvFilePath =
-          this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION
-              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-    }
-    try {
-      if (null == bufferedCSVWriter) {
-        FileType fileType = FileFactory.getFileType(storePath);
-        if (!FileFactory.isFileExist(this.storePath, fileType)) {
-          // create the folders if not exist
-          FileFactory.mkdirs(this.storePath, fileType);
-
-          // create the files
-          FileFactory.createNewFile(csvFilePath, fileType);
-        }
-
-        outCSVStream = FileFactory.getDataOutputStream(csvFilePath, fileType);
-
-        bufferedCSVWriter = new BufferedWriter(new OutputStreamWriter(outCSVStream,
-            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      }
-      bufferedCSVWriter.write(logStrings.toString());
-      bufferedCSVWriter.newLine();
-    } catch (FileNotFoundException e) {
-      LOGGER.error("Bad record csv Files not found");
-    } catch (IOException e) {
-      LOGGER.error("Error While writing bad record csv File");
-    }
-    finally {
-      badRecordEntry.put(taskKey, "Partially");
-    }
-  }
-
-  /**
-   * closeStreams void
-   */
-  public synchronized void closeStreams() {
-    CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream);
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index bfd2941..23bf6d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -146,9 +146,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
   private String csvFilepath;
 
   /**
-   * badRecordslogger
+   * badRecordsLogger
    */
-  private BadRecordslogger badRecordslogger;
+  private BadRecordsLogger badRecordsLogger;
   /**
    * Normalized Hier and HierWriter map
    */
@@ -377,11 +377,6 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
           columnsInfo.setColumnSchemaDetailsWrapper(meta.getColumnSchemaDetailsWrapper());
           columnsInfo.setColumnProperties(meta.getColumnPropertiesMap());
           updateBagLogFileName();
-          String key = meta.getDatabaseName() + '/' + meta.getTableName() +
-              '_' + meta.getTableName();
-          badRecordslogger = new BadRecordslogger(key, csvFilepath, getBadLogStoreLocation(
-              meta.getDatabaseName() + '/' + meta.getTableName() + "/" + meta.getTaskNo()));
-
           columnsInfo.setTimeOrdinalIndices(meta.timeOrdinalIndices);
           surrogateKeyGen = new FileStoreSurrogateKeyGenForCSV(columnsInfo, meta.getPartitionID(),
               meta.getSegmentId(), meta.getTaskNo());
@@ -447,6 +442,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
         }
         serializationNullFormat =
             meta.getTableOptionWrapper().get(SERIALIZATION_NULL_FORMAT.getName());
+        boolean badRecordsLoggerEnable;
+        boolean badRecordsLogRedirect = false;
+        boolean badRecordConvertNullDisable = false;
         badRecordsLoggerEnable = Boolean
             .parseBoolean(meta.getTableOptionWrapper().get(BAD_RECORDS_LOGGER_ENABLE.getName()));
         String bad_records_action =
@@ -472,6 +470,11 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
               break;
           }
         }
+        String key = meta.getDatabaseName() + '/' + meta.getTableName() +
+            '_' + meta.getTableName();
+        badRecordsLogger = new BadRecordsLogger(key, csvFilepath, getBadLogStoreLocation(
+            meta.getDatabaseName() + '/' + meta.getTableName() + "/" + meta.getTaskNo()),
+            badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable);
         HashMap<String, String> dateformatsHashMap = new HashMap<String, String>();
         if (meta.dateFormat != null) {
           String[] dateformats = meta.dateFormat.split(CarbonCommonConstants.COMMA);
@@ -523,7 +526,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
       }
 
       startReadingProcess(numberOfNodes);
-      badRecordslogger.closeStreams();
+      badRecordsLogger.closeStreams();
       if (!meta.isAggregate()) {
         closeNormalizedHierFiles();
       }
@@ -615,10 +618,6 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
    */
   private String serializationNullFormat;
 
-  private boolean badRecordsLoggerEnable;
-  private boolean badRecordsLogRedirect;
-  private boolean badRecordConvertNullDisable;
-
   private List<String> getDenormalizedHierarchies() {
     List<String> hierList = Arrays.asList(meta.hierNames);
     List<String> denormHiers = new ArrayList<String>(10);
@@ -952,10 +951,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     // In that case it will have first value empty and other values will be null
     // So If records is coming like this then we need to write this records as a bad Record.
 
-    if (null == r[0] && badRecordConvertNullDisable) {
-      badRecordslogger
-          .addBadRecordsToBuilder(r, "Column Names are coming NULL", "null",
-              badRecordsLogRedirect, badRecordsLoggerEnable);
+    if (null == r[0] && badRecordsLogger.isBadRecordConvertNullDisable()) {
+      badRecordsLogger
+          .addBadRecordsToBuilder(r, "Column Names are coming NULL");
       return null;
     }
 
@@ -995,7 +993,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
         if (!isSerialized && !isStringDataType[j] && CarbonCommonConstants.MEMBER_DEFAULT_VAL
             .equals(dimensionValue)) {
           addEntryToBadRecords(r, j, columnName, dataTypes[j]);
-          if (badRecordConvertNullDisable) {
+          if (badRecordsLogger.isBadRecordConvertNullDisable()) {
             return null;
           }
         }
@@ -1031,7 +1029,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
             || msr.length() == 0)) {
           addEntryToBadRecords(r, j, columnName,
               msrDataType[meta.msrMapping[msrCount]].name());
-          if(badRecordConvertNullDisable) {
+          if(badRecordsLogger.isBadRecordConvertNullDisable()) {
             return null;
           }
         } else {
@@ -1043,7 +1041,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
               if (null == measureValueBasedOnDataType) {
                 addEntryToBadRecords(r, j, columnName,
                     msrDataType[meta.msrMapping[msrCount]].name());
-                if (badRecordConvertNullDisable) {
+                if (badRecordsLogger.isBadRecordConvertNullDisable()) {
                   return null;
                 }
                 LOGGER.warn("Cannot convert : " + msr
@@ -1054,7 +1052,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
             }
           } catch (NumberFormatException e) {
             addEntryToBadRecords(r, j, columnName, msrDataType[meta.msrMapping[msrCount]].name());
-            if (badRecordConvertNullDisable) {
+            if (badRecordsLogger.isBadRecordConvertNullDisable()) {
               return null;
             }
             LOGGER.warn(
@@ -1217,7 +1215,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
               if (!isSerialized && surrogateKeyForHrrchy[0] == 1) {
                 addEntryToBadRecords(r, j, columnName,
                     details.getColumnType().name());
-                if(badRecordConvertNullDisable) {
+                if(badRecordsLogger.isBadRecordConvertNullDisable()) {
                   return null;
                 }
               }
@@ -1238,7 +1236,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
 
             if (!isSerialized ) {
               addEntryToBadRecords(r, j, columnName);
-              if(badRecordConvertNullDisable) {
+              if(badRecordsLogger.isBadRecordConvertNullDisable()) {
                 return null;
               }
             }
@@ -1271,24 +1269,21 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
   private void addEntryToBadRecords(Object[] r, int j, String columnName,
       String dataType) {
     dataType= DataTypeUtil.getColumnDataTypeDisplayName(dataType);
-    badRecordslogger.addBadRecordsToBuilder(r,
+    badRecordsLogger.addBadRecordsToBuilder(r,
         "The value " + " \"" + r[j] + "\"" + " with column name " + columnName
-            + " and column data type " + dataType + " is not a valid " + dataType + " type.",
-        "null", badRecordsLogRedirect, badRecordsLoggerEnable);
+            + " and column data type " + dataType + " is not a valid " + dataType + " type.");
   }
 
   private void addEntryToBadRecords(Object[] r, int j, String columnName) {
-    badRecordslogger.addBadRecordsToBuilder(r,
+    badRecordsLogger.addBadRecordsToBuilder(r,
         "Surrogate key for value " + " \"" + r[j] + "\"" + " with column name " + columnName
-            + " not found in dictionary cache", "null", badRecordsLogRedirect,
-        badRecordsLoggerEnable);
+            + " not found in dictionary cache");
   }
 
   private void addMemberNotExistEntry(Object[] r, int j, String columnName) {
-    badRecordslogger.addBadRecordsToBuilder(r,
+    badRecordsLogger.addBadRecordsToBuilder(r,
         "For Coulmn " + columnName + " \"" + r[j] + "\""
-            + " member not exist in the dimension table ", "null", badRecordsLogRedirect,
-        badRecordsLoggerEnable);
+            + " member not exist in the dimension table ");
   }
 
   private void insertHierIfRequired(Object[] out) throws KettleException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index d661901..eff59e5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -19,33 +19,47 @@
 
 package org.apache.carbondata.processing.util;
 
+import java.io.BufferedReader;
+import java.io.DataInputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.*;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
+import org.apache.carbondata.processing.etl.DataLoadingException;
 import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 
@@ -80,8 +94,7 @@ public final class CarbonDataProcessorUtil {
     } catch (NumberFormatException e) {
       configuredBufferSize = deafultvalue;
     }
-    int fileBufferSize = (configuredBufferSize *
-        CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
+    int fileBufferSize = (configuredBufferSize * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
         * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR) / numberOfFiles;
     if (fileBufferSize < CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR) {
       fileBufferSize = CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
@@ -89,7 +102,6 @@ public final class CarbonDataProcessorUtil {
     return fileBufferSize;
   }
 
-
   /**
    * Utility method to get level cardinality string
    *
@@ -164,7 +176,6 @@ public final class CarbonDataProcessorUtil {
     }// CHECKSTYLE:ON
   }
 
-
   public static void checkResult(List<CheckResultInterface> remarks, StepMeta stepMeta,
       String[] input) {
     CheckResult cr;
@@ -274,9 +285,8 @@ public final class CarbonDataProcessorUtil {
       String taskId, String partitionId, String segmentId, boolean isCompactionFlow) {
     String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
         + CarbonCommonConstants.UNDERSCORE + taskId;
-    if(isCompactionFlow){
-      tempLocationKey = CarbonCommonConstants
-          .COMPACTION_KEY_WORD + '_' + tempLocationKey;
+    if (isCompactionFlow) {
+      tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
     }
 
     String baseStorePath = CarbonProperties.getInstance()
@@ -367,7 +377,7 @@ public final class CarbonDataProcessorUtil {
   // TODO: need to simplify it. Not required create string first.
   public static Map<String, GenericDataType> getComplexTypesMap(DataField[] dataFields) {
     String complexTypeString = getComplexTypeString(dataFields);
-    if (null == complexTypeString) {
+    if (null == complexTypeString || complexTypeString.equals("")) {
       return new LinkedHashMap<>();
     }
     Map<String, GenericDataType> complexTypesMap = new LinkedHashMap<String, GenericDataType>();
@@ -396,4 +406,202 @@ public final class CarbonDataProcessorUtil {
     }
     return complexTypesMap;
   }
+
+  /**
+   * Get the csv file to read if it the path is file otherwise get the first file of directory.
+   *
+   * @param csvFilePath
+   * @return File
+   */
+  public static CarbonFile getCsvFileToRead(String csvFilePath) {
+    CarbonFile csvFile =
+        FileFactory.getCarbonFile(csvFilePath, FileFactory.getFileType(csvFilePath));
+
+    CarbonFile[] listFiles = null;
+    if (csvFile.isDirectory()) {
+      listFiles = csvFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile pathname) {
+          if (!pathname.isDirectory()) {
+            if (pathname.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) || pathname
+                .getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
+                    + CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
+              return true;
+            }
+          }
+          return false;
+        }
+      });
+    } else {
+      listFiles = new CarbonFile[1];
+      listFiles[0] = csvFile;
+    }
+    return listFiles[0];
+  }
+
+  /**
+   * Get the file header from csv file.
+   */
+  public static String getFileHeader(CarbonFile csvFile)
+      throws DataLoadingException {
+    DataInputStream fileReader = null;
+    BufferedReader bufferedReader = null;
+    String readLine = null;
+
+    FileType fileType = FileFactory.getFileType(csvFile.getAbsolutePath());
+
+    if (!csvFile.exists()) {
+      csvFile = FileFactory
+          .getCarbonFile(csvFile.getAbsolutePath() + CarbonCommonConstants.FILE_INPROGRESS_STATUS,
+              fileType);
+    }
+
+    try {
+      fileReader = FileFactory.getDataInputStream(csvFile.getAbsolutePath(), fileType);
+      bufferedReader =
+          new BufferedReader(new InputStreamReader(fileReader, Charset.defaultCharset()));
+      readLine = bufferedReader.readLine();
+    } catch (FileNotFoundException e) {
+      LOGGER.error(e, "CSV Input File not found  " + e.getMessage());
+      throw new DataLoadingException("CSV Input File not found ", e);
+    } catch (IOException e) {
+      LOGGER.error(e, "Not able to read CSV input File  " + e.getMessage());
+      throw new DataLoadingException("Not able to read CSV input File ", e);
+    } finally {
+      CarbonUtil.closeStreams(fileReader, bufferedReader);
+    }
+
+    return readLine;
+  }
+
+  public static boolean isHeaderValid(String tableName, String header,
+      CarbonDataLoadSchema schema, String delimiter) throws DataLoadingException {
+    delimiter = CarbonUtil.delimiterConverter(delimiter);
+    String[] columnNames =
+        CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName).toArray(new String[0]);
+    String[] csvHeader = header.toLowerCase().split(delimiter);
+
+    List<String> csvColumnsList = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+    for (String column : csvHeader) {
+      csvColumnsList.add(column.replaceAll("\"", "").trim());
+    }
+
+    int count = 0;
+
+    for (String columns : columnNames) {
+      if (csvColumnsList.contains(columns.toLowerCase())) {
+        count++;
+      }
+    }
+    return count == columnNames.length;
+  }
+
+  /**
+   * This method update the column Name
+   *
+   * @param schema
+   * @param tableName
+   */
+  public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) {
+    Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    String factTableName = schema.getCarbonTable().getFactTableName();
+    if (tableName.equals(factTableName)) {
+
+      List<CarbonDimension> dimensions =
+          schema.getCarbonTable().getDimensionByTableName(factTableName);
+
+      for (CarbonDimension dimension : dimensions) {
+
+        String foreignKey = null;
+        for (CarbonDataLoadSchema.DimensionRelation dimRel : schema.getDimensionRelationList()) {
+          for (String field : dimRel.getColumns()) {
+            if (dimension.getColName().equals(field)) {
+              foreignKey = dimRel.getRelation().getFactForeignKeyColumn();
+              break;
+            }
+          }
+          if (null != foreignKey) {
+            break;
+          }
+        }
+        if (null == foreignKey) {
+          columnNames.add(dimension.getColName());
+        } else {
+          columnNames.add(foreignKey);
+        }
+      }
+
+      List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(factTableName);
+      for (CarbonMeasure msr : measures) {
+        if (!msr.getColumnSchema().isInvisible()) {
+          columnNames.add(msr.getColName());
+        }
+      }
+    } else {
+      List<CarbonDimension> dimensions = schema.getCarbonTable().getDimensionByTableName(tableName);
+      for (CarbonDimension dimension : dimensions) {
+        columnNames.add(dimension.getColName());
+      }
+
+      List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(tableName);
+      for (CarbonMeasure msr : measures) {
+        columnNames.add(msr.getColName());
+      }
+    }
+
+    return columnNames;
+
+  }
+
+  /**
+   * Splits header to fields using delimiter.
+   * @param header
+   * @param delimiter
+   * @return
+   */
+  public static String[] getColumnFields(String header, String delimiter) {
+    delimiter = CarbonUtil.delimiterConverter(delimiter);
+    String[] columnNames = header.split(delimiter);
+    String tmpCol;
+    for (int i = 0; i < columnNames.length; i++) {
+      tmpCol = columnNames[i].replaceAll("\"", "");
+      columnNames[i] = tmpCol.trim();
+    }
+
+    return columnNames;
+  }
+
+  /**
+   * get agg type
+   */
+  public static char[] getAggType(int measureCount, String databaseName, String tableName) {
+    char[] aggType = new char[measureCount];
+    Arrays.fill(aggType, 'n');
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+        databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
+    for (int i = 0; i < aggType.length; i++) {
+      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+    }
+    return aggType;
+  }
+
+  /**
+   * Creates map for columns which dateformats mentioned while loading the data.
+   * @param dataFormatString
+   * @return
+   */
+  public static Map<String, String> getDateFormatMap(String dataFormatString) {
+    Map<String, String> dateformatsHashMap = new HashMap<>();
+    if (dataFormatString != null && !dataFormatString.isEmpty()) {
+      String[] dateformats = dataFormatString.split(CarbonCommonConstants.COMMA);
+      for (String dateFormat : dateformats) {
+        String[] dateFormatSplits = dateFormat.split(":", 2);
+        dateformatsHashMap
+            .put(dateFormatSplits[0].toLowerCase().trim(), dateFormatSplits[1].trim());
+      }
+    }
+    return dateformatsHashMap;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
index ad31e4a..0a5bbe2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/RemoveDictionaryUtil.java
@@ -263,6 +263,15 @@ public class RemoveDictionaryUtil {
 
   }
 
+  public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr,
+      Object[] measureArray) {
+
+    out[IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex()] = dimArray;
+    out[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()] = byteBufferArr;
+    out[IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex()] = measureArray;
+
+  }
+
   /**
    * @param row
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java
index 7b15f38..f935ba4 100644
--- a/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java
@@ -85,6 +85,7 @@ import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
 
 /**
  * This class will create store file based on provided schema
@@ -360,7 +361,7 @@ public class StoreCreator {
     DataProcessTaskStatus dataProcessTaskStatus = new DataProcessTaskStatus(databaseName, tableName);
     dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
     SchemaInfo info = new SchemaInfo();
-    BlockDetails blockDetails = new BlockDetails(loadModel.getFactFilePath(),
+    BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
         0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
     GraphGenerator.blockInfo.put("qwqwq", new BlockDetails[] { blockDetails });
     dataProcessTaskStatus.setBlocksID("qwqwq");


Mime
View raw message