carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: [CARBONDATA-233] bad record logger support for non parseable numeric and timestamp data
Date Fri, 07 Oct 2016 17:03:31 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master d101b6271 -> e8ce3637e


[CARBONDATA-233] bad record logger support for non parseable numeric and timestamp data


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/781a66c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/781a66c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/781a66c3

Branch: refs/heads/master
Commit: 781a66c3eab2749d680c566719be43fa00fd37ed
Parents: d101b62
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Mon Sep 12 02:35:17 2016 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri Oct 7 22:31:47 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 .../hadoop/test/util/StoreCreator.java          |   3 +
 .../carbondata/spark/load/CarbonLoadModel.java  |  47 ++++
 .../carbondata/spark/load/CarbonLoaderUtil.java |   2 +
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   4 +-
 .../execution/command/carbonTableSchema.scala   |   7 +
 .../test/resources/badrecords/datasample.csv    |   7 +
 .../badrecords/emptyTimeStampValue.csv          |   8 +
 .../test/resources/badrecords/emptyValues.csv   |   8 +
 .../badrecords/insufficientColumns.csv          |   4 +
 .../resources/badrecords/seriazableValue.csv    |   3 +
 .../badrecordloger/BadRecordLoggerTest.scala    | 266 +++++++++++++++++++
 .../processing/api/dataloader/SchemaInfo.java   |  41 +++
 .../processing/constants/LoggerAction.java      |  21 ++
 .../graphgenerator/GraphGenerator.java          |   2 +
 .../csvbased/BadRecordslogger.java              | 117 ++++++--
 .../csvbased/CarbonCSVBasedSeqGenStep.java      | 143 +++++++---
 17 files changed, 621 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 5231979..dce5770 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -289,6 +289,12 @@ public final class CarbonCommonConstants {
    * CSV_FILE_EXTENSION
    */
   public static final String CSV_FILE_EXTENSION = ".csv";
+
+  /**
+   * LOG_FILE_EXTENSION
+   */
+  public static final String LOG_FILE_EXTENSION = ".log";
+
   /**
    * COLON_SPC_CHARACTER
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index f4595a5..5caa96a 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -358,6 +358,9 @@ public class StoreCreator {
     schmaModel.setCommentCharacter("#");
     info.setDatabaseName(databaseName);
     info.setTableName(tableName);
+    info.setSerializationNullFormat("serialization_null_format" + "," + "\\N");
+    info.setBadRecordsLoggerEnable("bad_records_logger_enable"+","+"false");
+    info.setBadRecordsLoggerEnable("bad_records_logger_action"+","+"force");
 
     generateGraph(schmaModel, info, loadModel.getTableName(), "0", loadModel.getSchema(), null,
         loadModel.getLoadMetadataDetails());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
index 68f3929..5b7f02d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
@@ -110,6 +110,17 @@ public class CarbonLoadModel implements Serializable {
    * defines the string that should be treated as null while loadind data
    */
   private String serializationNullFormat;
+
+  /**
+   * defines the string to specify whether the bad record logger should be enabled or not
+   */
+  private String badRecordsLoggerEnable;
+
+  /**
+   * defines the option to specify the bad record log redirect to raw csv
+   */
+  private String badRecordsLoggerRedirect;
+
   /**
    * Max number of columns that needs to be parsed by univocity parser
    */
@@ -331,6 +342,8 @@ public class CarbonLoadModel implements Serializable {
     copy.factTimeStamp = factTimeStamp;
     copy.segmentId = segmentId;
     copy.serializationNullFormat = serializationNullFormat;
+    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
+    copy.badRecordsLoggerRedirect =badRecordsLoggerRedirect;
     copy.escapeChar = escapeChar;
     copy.quoteChar = quoteChar;
     copy.commentChar = commentChar;
@@ -372,6 +385,8 @@ public class CarbonLoadModel implements Serializable {
     copyObj.factTimeStamp = factTimeStamp;
     copyObj.segmentId = segmentId;
     copyObj.serializationNullFormat = serializationNullFormat;
+    copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
+    copyObj.badRecordsLoggerRedirect =badRecordsLoggerRedirect;
     copyObj.escapeChar = escapeChar;
     copyObj.quoteChar = quoteChar;
     copyObj.commentChar = commentChar;
@@ -545,6 +560,22 @@ public class CarbonLoadModel implements Serializable {
     this.serializationNullFormat = serializationNullFormat;
   }
 
+  /**
+   * returns the string to enable bad record logger
+   * @return
+   */
+  public String getBadRecordsLoggerEnable() {
+    return badRecordsLoggerEnable;
+  }
+
+  /**
+   * method sets the string to specify whether to enable or dissable the badrecord logger.
+   * @param badRecordsLoggerEnable
+   */
+  public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
+    this.badRecordsLoggerEnable = badRecordsLoggerEnable;
+  }
+
   public String getQuoteChar() {
     return quoteChar;
   }
@@ -574,4 +605,20 @@ public class CarbonLoadModel implements Serializable {
   public void setMaxColumns(String maxColumns) {
     this.maxColumns = maxColumns;
   }
+
+  /**
+   *  returns option to specify the bad record log redirect to raw csv
+   * @return
+   */
+  public String getBadRecordsLoggerRedirect() {
+    return badRecordsLoggerRedirect;
+  }
+
+  /**
+   * set option to specify the bad record log redirect to raw csv
+   * @param badRecordsLoggerRedirect
+   */
+  public void setBadRecordsLoggerRedirect(String badRecordsLoggerRedirect) {
+    this.badRecordsLoggerRedirect = badRecordsLoggerRedirect;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 941210a..9620ebc 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -195,6 +195,8 @@ public final class CarbonLoaderUtil {
     info.setComplexDelimiterLevel1(loadModel.getComplexDelimiterLevel1());
     info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2());
     info.setSerializationNullFormat(loadModel.getSerializationNullFormat());
+    info.setBadRecordsLoggerEnable(loadModel.getBadRecordsLoggerEnable());
+    info.setBadRecordsLoggerRedirect(loadModel.getBadRecordsLoggerRedirect());
 
     generateGraph(schmaModel, info, loadModel, outPutLoc);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index db53817..49c17f3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -87,6 +87,8 @@ class CarbonSqlParser()
   protected val FIELDS = carbonKeyWord("FIELDS")
   protected val FILEHEADER = carbonKeyWord("FILEHEADER")
   protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
+  protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
+  protected val BAD_RECORDS_LOGGER_ACTION = carbonKeyWord("BAD_RECORDS_LOGGER_ACTION")
   protected val FILES = carbonKeyWord("FILES")
   protected val FROM = carbonKeyWord("FROM")
   protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
@@ -931,7 +933,7 @@ class CarbonSqlParser()
     val options = partionDataOptions.get.groupBy(x => x._1)
     val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
-      "SERIALIZATION_NULL_FORMAT",
+      "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_LOGGER_ACTION",
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR"
     )
     var isSupported = true

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 1486ddd..0ada74a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1114,6 +1114,8 @@ private[sql] case class LoadTable(
       val commentchar = partionValues.getOrElse("commentchar", "#")
       val columnDict = partionValues.getOrElse("columndict", null)
       val serializationNullFormat = partionValues.getOrElse("serialization_null_format", "\\N")
+      val badRecordsLoggerEnable = partionValues.getOrElse("bad_records_logger_enable", "false")
+      val badRecordsLoggerRedirect = partionValues.getOrElse("bad_records_logger_action", "force")
       val allDictionaryPath = partionValues.getOrElse("all_dictionary_path", "")
       val complex_delimiter_level_1 = partionValues.getOrElse("complex_delimiter_level_1", "\\$")
       val complex_delimiter_level_2 = partionValues.getOrElse("complex_delimiter_level_2", "\\:")
@@ -1132,6 +1134,11 @@ private[sql] case class LoadTable(
       carbonLoadModel.setCommentChar(commentchar)
       carbonLoadModel.setSerializationNullFormat("serialization_null_format" + "," +
         serializationNullFormat)
+      carbonLoadModel
+        .setBadRecordsLoggerEnable("bad_records_logger_enable" + "," + badRecordsLoggerEnable)
+      carbonLoadModel
+        .setBadRecordsLoggerRedirect("bad_records_logger_action" + "," + badRecordsLoggerRedirect)
+
       if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
           complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
           delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/test/resources/badrecords/datasample.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/badrecords/datasample.csv b/integration/spark/src/test/resources/badrecords/datasample.csv
new file mode 100644
index 0000000..3895102
--- /dev/null
+++ b/integration/spark/src/test/resources/badrecords/datasample.csv
@@ -0,0 +1,7 @@
+ID,date,country,actual_price,Quantity,sold_price
+10000,2015/7/23,china,120000.45,3,140000.377
+10001,null,china,120000.45,3,140000.377
+10003,2015/7/23,null,120000.45,3,140000.377
+10004,2015/7/23,china,120000.45ghf,3,140000.377
+10005,2015/7/23,china,120000.45,3ghf,140000.377
+10006,2015/7/23,china,120000.45,3,140000.377ghf

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/test/resources/badrecords/emptyTimeStampValue.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/badrecords/emptyTimeStampValue.csv b/integration/spark/src/test/resources/badrecords/emptyTimeStampValue.csv
new file mode 100644
index 0000000..751b21b
--- /dev/null
+++ b/integration/spark/src/test/resources/badrecords/emptyTimeStampValue.csv
@@ -0,0 +1,8 @@
+ID,date,country,actual_price,Quantity,sold_price
+\N,,\N,\N,\N,\N
+10003,2015/7/23,xyz,120003.45,3,140000.377
+10003gh,2015/7/23,xyz,120003.45,3,140000.377
+10003,20/7/2016,xyz,120003.45,3,140000.377
+10003,2015/7/23,xyz,120003.45gf,3,140000.377
+
+10003,2015/7/23,xyz,120003.45,3,140000.377gf
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/test/resources/badrecords/emptyValues.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/badrecords/emptyValues.csv b/integration/spark/src/test/resources/badrecords/emptyValues.csv
new file mode 100644
index 0000000..492d6c2
--- /dev/null
+++ b/integration/spark/src/test/resources/badrecords/emptyValues.csv
@@ -0,0 +1,8 @@
+ID,date,country,actual_price,Quantity,sold_price
+10001,2015/7/23,,120003.45,3,140000.377
+,2015/7/23,,120003.45,3,140000.377
+,,,120003.45,3,140000.377
+,,,,3,140000.377
+,,,,,140000.377
+,,,,,
+10003,2015/7/23,india,120003.45,3,140000.377

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/test/resources/badrecords/insufficientColumns.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/badrecords/insufficientColumns.csv b/integration/spark/src/test/resources/badrecords/insufficientColumns.csv
new file mode 100644
index 0000000..24ac3f4
--- /dev/null
+++ b/integration/spark/src/test/resources/badrecords/insufficientColumns.csv
@@ -0,0 +1,4 @@
+ID,date,country,actual_price,Quantity,sold_price
+100,
+10001,2015/7/23,
+10003,2015/7/23,null,120003.45,3,140000.377

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/test/resources/badrecords/seriazableValue.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/badrecords/seriazableValue.csv b/integration/spark/src/test/resources/badrecords/seriazableValue.csv
new file mode 100644
index 0000000..2598a90
--- /dev/null
+++ b/integration/spark/src/test/resources/badrecords/seriazableValue.csv
@@ -0,0 +1,3 @@
+ID,date,country,actual_price,Quantity,sold_price
+\N,\N,\N,\N,\N,\N
+10003,2015/7/23,null,120003.45,3,140000.377

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
new file mode 100644
index 0000000..82156d6
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.badrecordloger
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+
+  override def beforeAll {
+    try {
+      sql("drop table IF EXISTS sales")
+      sql("drop table IF EXISTS serializable_values")
+      sql("drop table IF EXISTS serializable_values_false")
+      sql("drop table IF EXISTS insufficientColumn")
+      sql("drop table IF EXISTS insufficientColumn_false")
+      sql("drop table IF EXISTS emptyColumnValues")
+      sql("drop table IF EXISTS emptyColumnValues_false")
+      sql("drop table IF EXISTS empty_timestamp")
+      sql("drop table IF EXISTS empty_timestamp_false")
+      sql(
+        """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
+
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/mm/dd")
+      val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+        .getCanonicalPath
+      var csvFilePath = currentDirectory + "/src/test/resources/badrecords/datasample.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
+          +
+          "('bad_records_logger_enable'='true','bad_records_logger_action'='redirect', 'DELIMITER'=" +
+          " ',', 'QUOTECHAR'= '\"')");
+
+      // 1.0 "\N" which should be treated as NULL
+      // 1.1 Time stamp "\N" which should be treated as NULL
+      csvFilePath = currentDirectory +
+                    "/src/test/resources/badrecords/seriazableValue.csv"
+      sql(
+        """CREATE TABLE IF NOT EXISTS serializable_values(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values OPTIONS"
+          +
+          "('bad_records_logger_enable'='true', 'bad_records_logger_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable false
+      sql(
+        """CREATE TABLE IF NOT EXISTS serializable_values_false(ID BigInt, date Timestamp,
+           country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      sql(
+        "LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values_false OPTIONS"
+        + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // 2. insufficient columns - Bad records/Null value based on configuration
+      sql(
+        """CREATE TABLE IF NOT EXISTS insufficientColumn(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = currentDirectory +
+                    "/src/test/resources/badrecords/insufficientColumns.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn OPTIONS"
+          +
+          "('bad_records_logger_enable'='true', 'bad_records_logger_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable false
+      sql(
+        """CREATE TABLE IF NOT EXISTS insufficientColumn_false(ID BigInt, date Timestamp, country
+            String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn_false OPTIONS"
+          + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+      // 3. empty data for string data type - take empty value
+      // 4. empty data for non-string data type - Bad records/Null value based on configuration
+      //table should have only two records.
+      sql(
+        """CREATE TABLE IF NOT EXISTS emptyColumnValues(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = currentDirectory +
+                    "/src/test/resources/badrecords/emptyValues.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
+          +
+          "('bad_records_logger_enable'='true', 'bad_records_logger_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable to false
+      sql(
+        """CREATE TABLE IF NOT EXISTS emptyColumnValues_false(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = currentDirectory +
+                    "/src/test/resources/badrecords/emptyValues.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues_false OPTIONS"
+          + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+
+      // 4.1 Time stamp empty data - Bad records/Null value based on configuration
+      // 5. non-parsable data - Bad records/Null value based on configuration
+      // 6. empty line(check current one) - Bad records/Null value based on configuration
+      // only one value should be loadded.
+      sql(
+        """CREATE TABLE IF NOT EXISTS empty_timestamp(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = currentDirectory +
+                    "/src/test/resources/badrecords/emptyTimeStampValue.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
+          +
+          "('bad_records_logger_enable'='true', 'bad_records_logger_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable to false
+      sql(
+        """CREATE TABLE IF NOT EXISTS empty_timestamp_false(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = currentDirectory +
+                    "/src/test/resources/badrecords/emptyTimeStampValue.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp_false OPTIONS"
+          + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+
+    } catch {
+      case x: Throwable => CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    }
+  }
+
+  test("select count(*) from sales") {
+    sql("select count(*) from sales").show()
+    checkAnswer(
+      sql("select count(*) from sales"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from serializable_values") {
+    sql("select count(*) from serializable_values").show()
+    checkAnswer(
+      sql("select count(*) from serializable_values"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from serializable_values_false") {
+    sql("select count(*) from serializable_values_false").show()
+    checkAnswer(
+      sql("select count(*) from serializable_values_false"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from empty_timestamp") {
+    sql("select count(*) from empty_timestamp").show()
+    checkAnswer(
+      sql("select count(*) from empty_timestamp"),
+      Seq(Row(1)
+      )
+    )
+  }
+
+  test("select count(*) from insufficientColumn") {
+    sql("select count(*) from insufficientColumn").show()
+    checkAnswer(
+      sql("select count(*) from insufficientColumn"),
+      Seq(Row(1)
+      )
+    )
+  }
+
+  test("select count(*) from insufficientColumn_false") {
+    sql("select count(*) from insufficientColumn_false").show()
+    checkAnswer(
+      sql("select count(*) from insufficientColumn_false"),
+      Seq(Row(3)
+      )
+    )
+  }
+
+
+  test("select count(*) from emptyColumnValues") {
+    sql("select count(*) from emptyColumnValues").show()
+    checkAnswer(
+      sql("select count(*) from emptyColumnValues"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from emptyColumnValues_false") {
+    sql("select count(*) from emptyColumnValues_false").show()
+    checkAnswer(
+      sql("select count(*) from emptyColumnValues_false"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+  test("select count(*) from empty_timestamp_false") {
+    sql("select count(*) from empty_timestamp_false").show()
+    checkAnswer(
+      sql("select count(*) from empty_timestamp_false"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+
+  override def afterAll {
+    sql("drop table sales")
+    sql("drop table serializable_values")
+    sql("drop table serializable_values_false")
+    sql("drop table insufficientColumn")
+    sql("drop table insufficientColumn_false")
+    sql("drop table emptyColumnValues")
+    sql("drop table emptyColumnValues_false")
+    sql("drop table empty_timestamp")
+    sql("drop table empty_timestamp_false")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
index 61d416a..56de5ca 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
@@ -64,6 +64,16 @@ public class SchemaInfo {
    */
   private String serializationNullFormat;
 
+  /**
+   * defines the string to specify whether the bad record logger should be enabled or not
+   */
+  private String badRecordsLoggerEnable;
+  /**
+   * defines the option to specify whether to redirect the bad record logger to raw csv or not
+   */
+  private String badRecordsLoggerRedirect;
+
+
   public String getComplexDelimiterLevel1() {
     return complexDelimiterLevel1;
   }
@@ -188,4 +198,35 @@ public class SchemaInfo {
     this.serializationNullFormat = serializationNullFormat;
   }
 
+  /**
+   * returns the string to enable bad record logger
+   * @return
+   */
+  public String getBadRecordsLoggerEnable() {
+    return badRecordsLoggerEnable;
+  }
+
+  /**
+   * method sets the string to specify whether to enable or dissable the badrecord logger.
+   * @param badRecordsLoggerEnable
+   */
+  public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
+    this.badRecordsLoggerEnable = badRecordsLoggerEnable;
+  }
+
+  /**
+   * returns the option to set to redirect the badrecord logger to raw csv
+   * @return
+   */
+  public String getBadRecordsLoggerRedirect() {
+    return badRecordsLoggerRedirect;
+  }
+
+  /**
+   * set the option to set to redirect the badrecord logger to raw csv
+   * @param badRecordsLoggerRedirect
+   */
+  public void setBadRecordsLoggerRedirect(String badRecordsLoggerRedirect) {
+    this.badRecordsLoggerRedirect = badRecordsLoggerRedirect;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
new file mode 100644
index 0000000..bef65a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
@@ -0,0 +1,21 @@
+package org.apache.carbondata.processing.constants;
+
+/**
+ * enum to hold the bad record logger action
+ */
+public enum LoggerAction {
+
+  FORCE("FORCE"), // data will be converted to null
+  REDIRECT("REDIRECT"), // no null conversion moved to bad record and written to raw csv
+  IGNORE("IGNORE"); // no null conversion moved to bad record and not written to raw csv
+
+  private String name;
+
+  LoggerAction(String name) {
+    this.name = name;
+  }
+
+  @Override public String toString() {
+    return this.name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
index b10841f..4c2c178 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -916,6 +916,8 @@ public class GraphGenerator {
   private TableOptionWrapper getTableOptionWrapper() {
     TableOptionWrapper tableOptionWrapper = TableOptionWrapper.getTableOptionWrapperInstance();
     tableOptionWrapper.setTableOption(schemaInfo.getSerializationNullFormat());
+    tableOptionWrapper.setTableOption(schemaInfo.getBadRecordsLoggerEnable());
+    tableOptionWrapper.setTableOption(schemaInfo.getBadRecordsLoggerRedirect());
     return tableOptionWrapper;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
index 1035041..c373d62 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
@@ -64,9 +64,15 @@ public class BadRecordslogger {
   private BufferedWriter bufferedWriter;
   private DataOutputStream outStream;
   /**
+   * csv file writer
+   */
+  private BufferedWriter bufferedCSVWriter;
+  private DataOutputStream outCSVStream;
+  /**
    *
    */
   private CarbonFile logFile;
+
   /**
    * task key which is DatabaseName/TableName/tablename
    */
@@ -89,42 +95,61 @@ public class BadRecordslogger {
     return badRecordEntry.remove(key);
   }
 
-  public void addBadRecordsToBilder(Object[] row, int size, String reason, String valueComparer) {
-    StringBuilder logStrings = new StringBuilder();
-    int count = size;
-    for (int i = 0; i < size; i++) {
-      if (null == row[i]) {
-        logStrings.append(row[i]);
-      } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(row[i].toString())) {
-        logStrings.append(valueComparer);
-      } else {
-        logStrings.append(row[i]);
+  public void addBadRecordsToBuilder(Object[] row, String reason, String valueComparer,
+      boolean badRecordsLogRedirect, boolean badRecordLoggerEnable) {
+    if (badRecordsLogRedirect || badRecordLoggerEnable) {
+      StringBuilder logStrings = new StringBuilder();
+      int size = row.length;
+      int count = size;
+      for (int i = 0; i < size; i++) {
+        if (null == row[i]) {
+          char ch =
+              logStrings.length() > 0 ? logStrings.charAt(logStrings.length() - 1) : (char) -1;
+          if (ch == ',') {
+            logStrings = logStrings.deleteCharAt(logStrings.lastIndexOf(","));
+          }
+          break;
+        } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(row[i].toString())) {
+          logStrings.append(valueComparer);
+        } else {
+          logStrings.append(row[i]);
+        }
+        if (count > 1) {
+          logStrings.append(',');
+        }
+        count--;
       }
-      if (count > 1) {
-        logStrings.append(" , ");
+      if (badRecordsLogRedirect) {
+        writeBadRecordsToCSVFile(logStrings);
       }
-      count--;
-    }
-
-    logStrings.append("----->");
-    if (null != reason) {
-      if (reason.indexOf(CarbonCommonConstants.MEMBER_DEFAULT_VAL) > -1) {
-        logStrings.append(reason.replace(CarbonCommonConstants.MEMBER_DEFAULT_VAL, valueComparer));
-      } else {
-        logStrings.append(reason);
+      if (badRecordLoggerEnable) {
+        logStrings.append("----->");
+        if (null != reason) {
+          if (reason.indexOf(CarbonCommonConstants.MEMBER_DEFAULT_VAL) > -1) {
+            logStrings
+                .append(reason.replace(CarbonCommonConstants.MEMBER_DEFAULT_VAL, valueComparer));
+          } else {
+            logStrings.append(reason);
+          }
+        }
+        writeBadRecordsToFile(logStrings);
       }
+    } else {
+      // setting partial success entry since even if bad records are there then load
+      // status should be partial success regardless of bad record logged
+      badRecordEntry.put(taskKey, "Partially");
     }
-
-    writeBadRecordsToFile(logStrings);
   }
 
   /**
    *
    */
   private synchronized void writeBadRecordsToFile(StringBuilder logStrings) {
-    String filePath = this.storePath + File.separator + this.fileName
-        + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+
     if (null == logFile) {
+      String filePath =
+          this.storePath + File.separator + this.fileName + CarbonCommonConstants.LOG_FILE_EXTENSION
+              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
       logFile = FileFactory.getCarbonFile(filePath, FileFactory.getFileType(filePath));
     }
 
@@ -136,10 +161,10 @@ public class BadRecordslogger {
           FileFactory.mkdirs(this.storePath, fileType);
 
           // create the files
-          FileFactory.createNewFile(filePath, fileType);
+          FileFactory.createNewFile(logFile.getPath(), fileType);
         }
 
-        outStream = FileFactory.getDataOutputStream(filePath, fileType);
+        outStream = FileFactory.getDataOutputStream(logFile.getPath(), fileType);
 
         bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream,
                 Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
@@ -160,10 +185,46 @@ public class BadRecordslogger {
   }
 
   /**
+   *
+   */
+  private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings) {
+    String filePath =
+        this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION
+            + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    try {
+      if (null == bufferedCSVWriter) {
+        FileType fileType = FileFactory.getFileType(storePath);
+        if (!FileFactory.isFileExist(this.storePath, fileType)) {
+          // create the folders if not exist
+          FileFactory.mkdirs(this.storePath, fileType);
+
+          // create the files
+          FileFactory.createNewFile(filePath, fileType);
+        }
+
+        outCSVStream = FileFactory.getDataOutputStream(filePath, fileType);
+
+        bufferedCSVWriter = new BufferedWriter(new OutputStreamWriter(outCSVStream,
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      }
+      bufferedCSVWriter.write(logStrings.toString());
+      bufferedCSVWriter.newLine();
+    } catch (FileNotFoundException e) {
+      LOGGER.error("Bad Log Files not found");
+    } catch (IOException e) {
+      LOGGER.error("Error While writing bad log File");
+    }
+    finally {
+      badRecordEntry.put(taskKey, "Partially");
+    }
+  }
+
+  /**
    * closeStreams void
    */
   public synchronized void closeStreams() {
-    CarbonUtil.closeStreams(bufferedWriter, outStream);
+    CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/781a66c3/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index f094906..84a1f51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -62,6 +62,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.writer.ByteArrayHolder;
 import org.apache.carbondata.core.writer.HierarchyValueWriterForCSV;
+import org.apache.carbondata.processing.constants.LoggerAction;
 import org.apache.carbondata.processing.dataprocessor.manager.CarbonDataProcessorManager;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.mdkeygen.file.FileData;
@@ -439,6 +440,30 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
               .setDimensionOrdinalToDimensionMapping(populateNameToCarbonDimensionMap());
         }
         serializationNullFormat = meta.getTableOptionWrapper().get("serialization_null_format");
+        badRecordsLoggerEnable =
+            Boolean.parseBoolean(meta.getTableOptionWrapper().get("bad_records_logger_enable"));
+        badRecordConvertNullDisable = true;
+        String bad_records_logger_action =
+            meta.getTableOptionWrapper().get("bad_records_logger_action");
+        if(null != bad_records_logger_action) {
+          LoggerAction loggerAction = null;
+          try {
+            loggerAction = LoggerAction.valueOf(bad_records_logger_action.toUpperCase());
+          } catch (IllegalArgumentException e) {
+            loggerAction = LoggerAction.FORCE;
+          }
+          switch (loggerAction) {
+            case FORCE:
+              badRecordConvertNullDisable = false;
+              break;
+            case REDIRECT:
+              badRecordsLogRedirect = true;
+              break;
+            case IGNORE:
+              badRecordsLogRedirect = false;
+              break;
+          }
+        }
       }
       // no more input to be expected...
       if (r == null) {
@@ -554,6 +579,10 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
    */
   private String serializationNullFormat;
 
+  private boolean badRecordsLoggerEnable;
+  private boolean badRecordsLogRedirect;
+  private boolean badRecordConvertNullDisable;
+
   private List<String> getDenormalizedHierarchies() {
     List<String> hierList = Arrays.asList(meta.hierNames);
     List<String> denormHiers = new ArrayList<String>(10);
@@ -725,7 +754,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
       csvFilepath = csvFilepath.substring(0, csvFilepath.indexOf("."));
     }
 
-    csvFilepath = csvFilepath + '_' + System.currentTimeMillis() + ".log";
+    csvFilepath = csvFilepath + '_' + System.currentTimeMillis();
 
   }
 
@@ -853,7 +882,6 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
 
   private Object[] process(Object[] r) throws RuntimeException {
     try {
-      r = changeNullValueToNullString(r);
       Object[] out = populateOutputRow(r);
       if (out != null) {
         for (int i = 0; i < meta.normLength - meta.complexTypes.size(); i++) {
@@ -869,25 +897,6 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     }
   }
 
-  private Object[] changeNullValueToNullString(Object[] rowValue) {
-    int i = 0;
-    for (Object obj : rowValue) {
-      if (obj != null) {
-        //removed valueToCheckAgainst does not make sense to
-        // compare non null object with a null string
-        if (obj.toString().equalsIgnoreCase(serializationNullFormat)) {
-          rowValue[i] = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
-        }
-      } else {
-        rowValue[i] = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
-      }
-
-      i++;
-    }
-
-    return rowValue;
-  }
-
   private Object[] populateOutputRow(Object[] r) throws KettleException {
 
     // Copy the dimension String values to output
@@ -901,9 +910,10 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     // In that case it will have first value empty and other values will be null
     // So If records is coming like this then we need to write this records as a bad Record.
 
-    if (null == r[0]) {
+    if (null == r[0] && badRecordConvertNullDisable) {
       badRecordslogger
-          .addBadRecordsToBilder(r, inputColumnsSize, "Column Names are coming NULL", "null");
+          .addBadRecordsToBuilder(r, "Column Names are coming NULL", "null",
+              badRecordsLogRedirect, badRecordsLoggerEnable);
       return null;
     }
 
@@ -927,10 +937,26 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
       String columnName = metaColumnNames[j];
       String foreignKeyColumnName = foreignKeyMappingColumns[j];
       // check if it is ignore dictionary dimension or not . if yes directly write byte buffer
-      String tuple = (String) r[j];
+      String tuple = null == r[j] ?
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL :
+          (String) r[j];
+      // check whether the column value is the value to be  serialized as null.
+      boolean isSerialized = false;
+      if(tuple.equalsIgnoreCase(serializationNullFormat)) {
+        tuple = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+        isSerialized = true;
+      }
       if (isNoDictionaryColumn[j]) {
-        processnoDictionaryDim(noDictionaryAndComplexIndexMapping[j], tuple, dataTypes[j],
-            isStringDataType[j], byteBufferArr);
+        String dimensionValue =
+            processnoDictionaryDim(noDictionaryAndComplexIndexMapping[j], tuple, dataTypes[j],
+                isStringDataType[j], byteBufferArr);
+        if (!isSerialized && !isStringDataType[j] && CarbonCommonConstants.MEMBER_DEFAULT_VAL
+            .equals(dimensionValue)) {
+          addEntryToBadRecords(r, j, columnName, dataTypes[j]);
+          if (badRecordConvertNullDisable) {
+            return null;
+          }
+        }
         continue;
       }
       // There is a possibility that measure can be referred as dimensions also
@@ -941,7 +967,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
       //3) This column can be measure column
 
       if (measurePresentMapping[j]) {
-        String msr = r[j] == null ? null : r[j].toString();
+        String msr = tuple == null ? null : tuple.toString();
         isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(msr);
         if (measureSurrogateReqMapping[j] && !isNull) {
           Integer surrogate = 0;
@@ -959,6 +985,13 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
           }
 
           out[memberMapping[dimLen + index]] = surrogate.doubleValue();
+        } else if (!isSerialized &&  (isNull || msr == null
+            || msr.length() == 0)) {
+          addEntryToBadRecords(r, j, columnName,
+              msrDataType[meta.msrMapping[msrCount]].name());
+          if(badRecordConvertNullDisable) {
+            return null;
+          }
         } else {
           try {
             out[memberMapping[dimLen + index] - meta.complexTypes.size()] =
@@ -974,11 +1007,16 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
                   .getMeasureValueBasedOnDataType(msr, msrDataType[meta.msrMapping[msrCount]],
                       meta.carbonMeasures[meta.msrMapping[msrCount]]);
             } catch (NumberFormatException ex) {
+              addEntryToBadRecords(r, j, columnName, msrDataType[meta.msrMapping[msrCount]].name());
+              if (badRecordConvertNullDisable) {
+                return null;
+              }
               LOGGER.warn("Cant not convert : " + msr
                   + " to Numeric type value. Value considered as null.");
               out[memberMapping[dimLen + index] - meta.complexTypes.size()] = null;
             }
           }
+
         }
 
         index++;
@@ -1028,7 +1066,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
             if (null != keyFromCsv) {
               surrogateKeyForHierarchy = cache.get(keyFromCsv);
             } else {
-              addMemberNotExistEntry(r, inputColumnsSize, j, columnName);
+              addMemberNotExistEntry(r, j, columnName);
               return null;
             }
             // If cardinality exceeded for some levels then
@@ -1037,7 +1075,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
             // getting this scenerio we will log it
             // in bad records
             if (null == surrogateKeyForHierarchy) {
-              addEntryToBadRecords(r, inputColumnsSize, j, columnName);
+              addEntryToBadRecords(r, j, columnName);
               return null;
 
             }
@@ -1094,14 +1132,14 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
           if (null != keyFromCsv) {
             surrogateKeyForHrrchy = cache.get(keyFromCsv);
           } else {
-            addMemberNotExistEntry(r, inputColumnsSize, j, columnName);
+            addMemberNotExistEntry(r, j, columnName);
             return null;
           }
           // If cardinality exceeded for some levels then for that hierarchy will not be their
           // so while joining with fact table if we are getting this scenerio we will log it
           // in bad records
           if (null == surrogateKeyForHrrchy) {
-            addEntryToBadRecords(r, inputColumnsSize, j, columnName);
+            addEntryToBadRecords(r, j, columnName);
             return null;
 
           }
@@ -1113,7 +1151,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
           } else {
             properties = new Object[propIndex.length];
             for (int ind = 0; ind < propIndex.length; ind++) {
-              properties[ind] = r[propIndex[ind]];
+              Object objectValue = r[propIndex[ind]];
+              properties[ind] = null == objectValue ?
+                  CarbonCommonConstants.MEMBER_DEFAULT_VAL : (String)objectValue;
             }
           }
           surrogateKeyForHrrchy = new int[1];
@@ -1133,6 +1173,13 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
                       .getDirectDictionaryGenerator(details.getColumnType());
               surrogateKeyForHrrchy[0] =
                   directDictionaryGenerator1.generateDirectSurrogateKey(tuple);
+              if (!isSerialized && surrogateKeyForHrrchy[0] == 1) {
+                addEntryToBadRecords(r, j, columnName,
+                    details.getColumnType().name());
+                if(badRecordConvertNullDisable) {
+                  return null;
+                }
+              }
               surrogateKeyGen.max[m] = Integer.MAX_VALUE;
 
             } else {
@@ -1147,6 +1194,13 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
             }
           }
           if (surrogateKeyForHrrchy[0] == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+
+            if (!isSerialized ) {
+              addEntryToBadRecords(r, j, columnName);
+              if(badRecordConvertNullDisable) {
+                return null;
+              }
+            }
             surrogateKeyForHrrchy[0] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
           }
         }
@@ -1173,16 +1227,26 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     return newArray;
   }
 
-  private void addEntryToBadRecords(Object[] r, int inputRowSize, int j, String columnName) {
-    badRecordslogger.addBadRecordsToBilder(r, inputRowSize,
+  private void addEntryToBadRecords(Object[] r, int j, String columnName,
+      String dataType) {
+    badRecordslogger.addBadRecordsToBuilder(r,
+        "The value " + " \"" + r[j] + "\"" + " with column name " + columnName
+            + " and column data type " + dataType + " is not a valid " + dataType + " type.",
+        "null", badRecordsLogRedirect, badRecordsLoggerEnable);
+  }
+
+  private void addEntryToBadRecords(Object[] r, int j, String columnName) {
+    badRecordslogger.addBadRecordsToBuilder(r,
         "Surrogate key for value " + " \"" + r[j] + "\"" + " with column name " + columnName
-            + " not found in dictionary cache", "null");
+            + " not found in dictionary cache", "null", badRecordsLogRedirect,
+        badRecordsLoggerEnable);
   }
 
-  private void addMemberNotExistEntry(Object[] r, int inputRowSize, int j, String columnName) {
-    badRecordslogger.addBadRecordsToBilder(r, inputRowSize,
+  private void addMemberNotExistEntry(Object[] r, int j, String columnName) {
+    badRecordslogger.addBadRecordsToBuilder(r,
         "For Coulmn " + columnName + " \"" + r[j] + "\""
-            + " member not exist in the dimension table ", "null");
+            + " member not exist in the dimension table ", "null", badRecordsLogRedirect,
+        badRecordsLoggerEnable);
   }
 
   private void insertHierIfRequired(Object[] out) throws KettleException {
@@ -1760,7 +1824,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     }
   }
 
-  private void processnoDictionaryDim(int index, String dimensionValue, String dataType,
+  private String processnoDictionaryDim(int index, String dimensionValue, String dataType,
       boolean isStringDataType, ByteBuffer[] out) {
     if (!(isStringDataType)) {
       if (null == DataTypeUtil
@@ -1772,6 +1836,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
         .wrap(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
     buffer.rewind();
     out[index] = buffer;
+    return dimensionValue;
   }
 
   /**


Mime
View raw message