carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [35/50] [abbrv] carbondata git commit: [CARBONDATA-1218] In case of data-load failure the BadRecordsLogger.badRecordEntry map holding the task Status is not removing the task Entry
Date Tue, 09 Jan 2018 04:02:03 GMT
[CARBONDATA-1218] In case of data-load failure the BadRecordsLogger.badRecordEntry map holding
the task Status is not removing the task Entry

Problem
For GLOBAL_SORT scope option in case of data-load failure the BadRecordsLogger.badRecordEntry
map holding the task Status is not removing the task Entry.
Because of this the next load is getting failed even though the data being loaded has no bad
records.

Solution
The map entry must be removed after load completion either success or fail.
Refactored the Bad record logger.

This closes #1082


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/837fdd2c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/837fdd2c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/837fdd2c

Branch: refs/heads/branch-1.3
Commit: 837fdd2cb91780c9193efb5b37cd107c9fa36591
Parents: bcf3ca3
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Fri Jun 23 12:26:25 2017 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Fri Jan 5 10:47:37 2018 +0530

----------------------------------------------------------------------
 .../streaming/CarbonStreamRecordWriter.java     |   4 +-
 .../badrecordloger/BadRecordLoggerTest.scala    |   3 +-
 .../load/DataLoadProcessorStepOnSpark.scala     |  30 +++--
 .../spark/load/GlobalSortHelper.scala           |  14 ++-
 .../processing/loading/BadRecordsLogger.java    |   3 +
 .../loading/BadRecordsLoggerProvider.java       |  96 +++++++++++++++
 .../processing/loading/DataLoadExecutor.java    |  17 +--
 .../steps/DataConverterProcessorStepImpl.java   |  98 +--------------
 ...ConverterProcessorWithBucketingStepImpl.java |  79 +-----------
 .../processing/util/CarbonBadRecordUtil.java    | 122 +++++++++++++++++++
 .../util/CarbonDataProcessorUtil.java           |  57 ---------
 11 files changed, 267 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index bad2f44..7d862d4 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.FileHeader;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
@@ -49,7 +50,6 @@ import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.parser.RowParser;
 import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
-import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -150,7 +150,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object>
{
 
     // initialize parser and converter
     rowParser = new RowParserImpl(dataFields, configuration);
-    badRecordLogger = DataConverterProcessorStepImpl.createBadRecordLogger(configuration);
+    badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
     converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
     configuration.setCardinalityFinder(converter);
     converter.initialize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
index 463ddbf..797a972 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
@@ -324,6 +324,7 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
     sql("drop table empty_timestamp")
     sql("drop table empty_timestamp_false")
     sql("drop table dataloadOptionTests")
+    sql("drop table IF EXISTS loadIssue")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 2c74657..21de003 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -30,16 +30,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, TableProcessingOperations}
+import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider,
CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
 import org.apache.carbondata.processing.loading.sort.SortStepRowUtil
-import org.apache.carbondata.processing.loading.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl}
+import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
+import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
 import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
 import org.apache.carbondata.spark.util.Util
 
@@ -103,18 +103,20 @@ object DataLoadProcessorStepOnSpark {
       rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
     val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
-    val badRecordLogger = DataConverterProcessorStepImpl.createBadRecordLogger(conf)
+    val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
     val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger)
     rowConverter.initialize()
 
     TaskContext.get().addTaskCompletionListener { context =>
-      DataConverterProcessorStepImpl.close(badRecordLogger, conf, rowConverter)
-      GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum)
+      val hasBadRecord: Boolean = CarbonBadRecordUtil.hasBadRecord(model)
+      close(conf, badRecordLogger, rowConverter)
+      GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord)
     }
 
     TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
-      DataConverterProcessorStepImpl.close(badRecordLogger, conf, rowConverter)
-      GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum)
+      val hasBadRecord : Boolean = CarbonBadRecordUtil.hasBadRecord(model)
+      close(conf, badRecordLogger, rowConverter)
+      GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord)
 
       wrapException(e, model)
     }
@@ -130,6 +132,18 @@ object DataLoadProcessorStepOnSpark {
     }
   }
 
+  def close(conf: CarbonDataLoadConfiguration,
+      badRecordLogger: BadRecordsLogger,
+      rowConverter: RowConverterImpl): Unit = {
+    if (badRecordLogger != null) {
+      badRecordLogger.closeStreams()
+      CarbonBadRecordUtil.renameBadRecord(conf)
+    }
+    if (rowConverter != null) {
+      rowConverter.finish()
+    }
+  }
+
   def convertTo3Parts(
       rows: Iterator[CarbonRow],
       index: Int,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
index a42680e..4e3fc88 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
@@ -27,10 +27,16 @@ import org.apache.carbondata.processing.loading.BadRecordsLogger
 object GlobalSortHelper {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def badRecordsLogger(loadModel: CarbonLoadModel, badRecordsAccum: Accumulator[Int]): Unit
= {
-    val key = new CarbonTableIdentifier(loadModel.getDatabaseName, loadModel.getTableName,
null)
-      .getBadRecordLoggerKey
-    if (null != BadRecordsLogger.hasBadRecord(key)) {
+  /**
+   *
+   * @param loadModel       Carbon load model instance
+   * @param badRecordsAccum Accumulator to maintain the load state if 0 then success id !0
then
+   *                        partial successfull
+   * @param hasBadRecord    if <code>true<code> then load bad records vice versa.
+   */
+  def badRecordsLogger(loadModel: CarbonLoadModel,
+      badRecordsAccum: Accumulator[Int], hasBadRecord: Boolean): Unit = {
+    if (hasBadRecord) {
       LOGGER.error("Data Load is partially success for table " + loadModel.getTableName)
       badRecordsAccum.add(1)
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
index bc0ce3a..d668329 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
@@ -271,6 +271,9 @@ public class BadRecordsLogger {
    * closeStreams void
    */
   public synchronized void closeStreams() {
+    // removing taskKey Entry while closing the stream
+    // This will make sure the cleanup of the task status even in case of some failure.
+    removeBadRecordKey(taskKey);
     CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
new file mode 100644
index 0000000..614a959
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.File;
+
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+
+/**
+ * This class provides the BadRecordsLogger instance
+ */
+public class BadRecordsLoggerProvider {
+  /**
+   * method returns the BadRecordsLogger instance
+   * @param configuration
+   * @return
+   */
+  public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration)
{
+    boolean badRecordsLogRedirect = false;
+    boolean badRecordConvertNullDisable = false;
+    boolean isDataLoadFail = false;
+    boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+            .toString());
+    Object bad_records_action =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+            .toString();
+    if (null != bad_records_action) {
+      LoggerAction loggerAction = null;
+      try {
+        loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        loggerAction = LoggerAction.FORCE;
+      }
+      switch (loggerAction) {
+        case FORCE:
+          badRecordConvertNullDisable = false;
+          break;
+        case REDIRECT:
+          badRecordsLogRedirect = true;
+          badRecordConvertNullDisable = true;
+          break;
+        case IGNORE:
+          badRecordsLogRedirect = false;
+          badRecordConvertNullDisable = true;
+          break;
+        case FAIL:
+          isDataLoadFail = true;
+          break;
+      }
+    }
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+        identifier.getTableName() + '_' + System.currentTimeMillis(),
+        getBadLogStoreLocation(configuration,
+            identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+                .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration
+                .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+        badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
+  }
+
+  public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration,
+      String storeLocation) {
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    return badLogStoreLocation;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
index 10b19b7..fc5c41f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.processing.loading.exception.BadRecordFoundExceptio
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.exception.NoRetryException;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * It executes the data load.
@@ -49,8 +50,7 @@ public class DataLoadExecutor {
       // 2. execute the step
       loadProcessorStep.execute();
       // check and remove any bad record key from bad record entry logger static map
-      if (badRecordFound(
-          loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier()))
{
+      if (CarbonBadRecordUtil.hasBadRecord(loadModel)) {
         LOGGER.error("Data Load is partially success for table " + loadModel.getTableName());
       } else {
         LOGGER.info("Data loading is successful for table " + loadModel.getTableName());
@@ -65,9 +65,6 @@ public class DataLoadExecutor {
       LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName());
       throw new CarbonDataLoadingException(
           "Data Loading failed for table " + loadModel.getTableName(), e);
-    } finally {
-      removeBadRecordKey(
-          loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier());
     }
   }
 
@@ -87,16 +84,6 @@ public class DataLoadExecutor {
   }
 
   /**
-   * This method will remove the bad record key from bad record logger
-   *
-   * @param carbonTableIdentifier
-   */
-  private void removeBadRecordKey(CarbonTableIdentifier carbonTableIdentifier) {
-    String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
-    BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey);
-  }
-
-  /**
    * Method to clean all the resource
    */
   public void close() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index a0592f6..90a340d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -17,28 +17,22 @@
 
 package org.apache.carbondata.processing.loading.steps;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.constants.LoggerAction;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.converter.RowConverter;
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * Replace row data fields with dictionary values if column is configured dictionary encoded.
@@ -64,7 +58,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     super.initialize();
     child.initialize();
     converters = new ArrayList<>();
-    badRecordLogger = createBadRecordLogger(configuration);
+    badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
     RowConverter converter =
         new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
     configuration.setCardinalityFinder(converter);
@@ -121,70 +115,12 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     throw new UnsupportedOperationException();
   }
 
-  public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration)
{
-    boolean badRecordsLogRedirect = false;
-    boolean badRecordConvertNullDisable = false;
-    boolean isDataLoadFail = false;
-    boolean badRecordsLoggerEnable = Boolean.parseBoolean(
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
-            .toString());
-    Object bad_records_action =
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
-            .toString();
-    if (null != bad_records_action) {
-      LoggerAction loggerAction = null;
-      try {
-        loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
-      } catch (IllegalArgumentException e) {
-        loggerAction = LoggerAction.FORCE;
-      }
-      switch (loggerAction) {
-        case FORCE:
-          badRecordConvertNullDisable = false;
-          break;
-        case REDIRECT:
-          badRecordsLogRedirect = true;
-          badRecordConvertNullDisable = true;
-          break;
-        case IGNORE:
-          badRecordsLogRedirect = false;
-          badRecordConvertNullDisable = true;
-          break;
-        case FAIL:
-          isDataLoadFail = true;
-          break;
-      }
-    }
-    CarbonTableIdentifier identifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
-        identifier.getTableName() + '_' + System.currentTimeMillis(),
-        getBadLogStoreLocation(configuration,
-            identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-                .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration
-                .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
-        badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
-  }
-
-  public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration,
-      String storeLocation) {
-    String badLogStoreLocation = (String) configuration
-        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
-    if (null == badLogStoreLocation) {
-      badLogStoreLocation =
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    }
-    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
-    return badLogStoreLocation;
-  }
-
   @Override
   public void close() {
     if (!closed) {
       if (null != badRecordLogger) {
         badRecordLogger.closeStreams();
-        renameBadRecord(badRecordLogger, configuration);
+        CarbonBadRecordUtil.renameBadRecord(configuration);
       }
       super.close();
       if (converters != null) {
@@ -197,32 +133,6 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     }
   }
 
-  public static void close(BadRecordsLogger badRecordLogger, CarbonDataLoadConfiguration
-      configuration, RowConverter converter) {
-    if (badRecordLogger != null) {
-      badRecordLogger.closeStreams();
-      renameBadRecord(badRecordLogger, configuration);
-    }
-    if (converter != null) {
-      converter.finish();
-    }
-  }
-
-  private static void renameBadRecord(BadRecordsLogger badRecordLogger,
-      CarbonDataLoadConfiguration configuration) {
-    // rename operation should be performed only in case either bad reccords loggers is enabled
-    // or bad records redirect is enabled
-    if (badRecordLogger.isBadRecordLoggerEnable() || badRecordLogger.isBadRecordsLogRedirect())
{
-      // rename the bad record in progress to normal
-      CarbonTableIdentifier identifier =
-          configuration.getTableIdentifier().getCarbonTableIdentifier();
-      CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
-          identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
-              + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo());
-    }
-  }
-
   @Override protected String getStepName() {
     return "Data Converter";
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
index 82112b7..a1181c9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -17,32 +17,26 @@
 
 package org.apache.carbondata.processing.loading.steps;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.constants.LoggerAction;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.converter.RowConverter;
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
 import org.apache.carbondata.processing.loading.partition.Partitioner;
 import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * Replace row data fields with dictionary values if column is configured dictionary encoded.
@@ -71,7 +65,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
     super.initialize();
     child.initialize();
     converters = new ArrayList<>();
-    badRecordLogger = createBadRecordLogger();
+    badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
     RowConverter converter =
         new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
     configuration.setCardinalityFinder(converter);
@@ -146,69 +140,13 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
     throw new UnsupportedOperationException();
   }
 
-  private BadRecordsLogger createBadRecordLogger() {
-    boolean badRecordsLogRedirect = false;
-    boolean badRecordConvertNullDisable = false;
-    boolean isDataLoadFail = false;
-    boolean badRecordsLoggerEnable = Boolean.parseBoolean(
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
-            .toString());
-    Object bad_records_action =
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
-            .toString();
-    if (null != bad_records_action) {
-      LoggerAction loggerAction = null;
-      try {
-        loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
-      } catch (IllegalArgumentException e) {
-        loggerAction = LoggerAction.FORCE;
-      }
-      switch (loggerAction) {
-        case FORCE:
-          badRecordConvertNullDisable = false;
-          break;
-        case REDIRECT:
-          badRecordsLogRedirect = true;
-          badRecordConvertNullDisable = true;
-          break;
-        case IGNORE:
-          badRecordsLogRedirect = false;
-          badRecordConvertNullDisable = true;
-          break;
-        case FAIL:
-          isDataLoadFail = true;
-          break;
-      }
-    }
-    CarbonTableIdentifier identifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
-        identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
-        identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
-            + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
-        badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
-  }
-
-  private String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation = (String) configuration
-        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
-    if (null == badLogStoreLocation) {
-      badLogStoreLocation =
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    }
-    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
-    return badLogStoreLocation;
-  }
-
   @Override
   public void close() {
     if (!closed) {
       super.close();
       if (null != badRecordLogger) {
         badRecordLogger.closeStreams();
-        renameBadRecord(configuration);
+        CarbonBadRecordUtil.renameBadRecord(configuration);
       }
       if (converters != null) {
         for (RowConverter converter : converters) {
@@ -217,15 +155,6 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
       }
     }
   }
-  private static void renameBadRecord(CarbonDataLoadConfiguration configuration) {
-    // rename the bad record in progress to normal
-    CarbonTableIdentifier identifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
-        identifier.getDatabaseName() + File.separator + identifier.getTableName()
-            + File.separator + configuration.getSegmentId() + File.separator + configuration
-            .getTaskNo());
-  }
   @Override protected String getStepName() {
     return "Data Converter with Bucketing";
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
new file mode 100644
index 0000000..26a6f77
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+/**
+ * Common methods used for the bad record handling
+ */
+public class CarbonBadRecordUtil {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDataProcessorUtil.class.getName());
+
+  /**
+   * The method used to rename badrecord files from inprogress to normal
+   *
+   * @param configuration
+   */
+  public static void renameBadRecord(CarbonDataLoadConfiguration configuration) {
+    // rename the bad record in progress to normal
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    renameBadRecordsFromInProgressToNormal(configuration,
+        identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator
+            + configuration.getSegmentId() + File.separator + configuration.getTaskNo());
+  }
+
+  /**
+   * @param configuration
+   * @param storeLocation
+   */
+  private static void renameBadRecordsFromInProgressToNormal(
+      CarbonDataLoadConfiguration configuration, String storeLocation) {
+    // get the base store location
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    FileFactory.FileType fileType = FileFactory.getFileType(badLogStoreLocation);
+    try {
+      if (!FileFactory.isFileExist(badLogStoreLocation, fileType)) {
+        return;
+      }
+    } catch (IOException e1) {
+      LOGGER.info("bad record folder does not exist");
+    }
+    CarbonFile carbonFile = FileFactory.getCarbonFile(badLogStoreLocation, fileType);
+
+    CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile pathname) {
+        if (pathname.getName().indexOf(CarbonCommonConstants.FILE_INPROGRESS_STATUS) >
-1) {
+          return true;
+        }
+        return false;
+      }
+    });
+
+    String badRecordsInProgressFileName = null;
+    String changedFileName = null;
+    for (CarbonFile badFiles : listFiles) {
+      badRecordsInProgressFileName = badFiles.getName();
+
+      changedFileName = badLogStoreLocation + File.separator + badRecordsInProgressFileName
+          .substring(0, badRecordsInProgressFileName.lastIndexOf('.'));
+
+      badFiles.renameTo(changedFileName);
+
+      if (badFiles.exists()) {
+        if (!badFiles.delete()) {
+          LOGGER.error("Unable to delete File : " + badFiles.getName());
+        }
+      }
+    }
+  }
+
+  /**
+   * The method removes the entry if exist and returns <code>true</code> if bad
records exist
+   * else <code>false</code>
+   *
+   * @param loadModel
+   * @return
+   */
+  public static boolean hasBadRecord(CarbonLoadModel loadModel) {
+    String key = loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier()
+        .getBadRecordLoggerKey();
+    return (null != BadRecordsLogger.hasBadRecord(key));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index beb1ad1..2a4cc00 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -32,12 +32,7 @@ import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -96,58 +91,6 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * @param configuration
-   * @param storeLocation
-   */
-  public static void renameBadRecordsFromInProgressToNormal(
-      CarbonDataLoadConfiguration configuration, String storeLocation) {
-    // get the base store location
-    String badLogStoreLocation = (String) configuration
-        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
-    if (null == badLogStoreLocation) {
-      badLogStoreLocation =
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    }
-    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
-    FileType fileType = FileFactory.getFileType(badLogStoreLocation);
-    try {
-      if (!FileFactory.isFileExist(badLogStoreLocation, fileType)) {
-        return;
-      }
-    } catch (IOException e1) {
-      LOGGER.info("bad record folder does not exist");
-    }
-    CarbonFile carbonFile = FileFactory.getCarbonFile(badLogStoreLocation, fileType);
-
-    CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile pathname) {
-        if (pathname.getName().indexOf(CarbonCommonConstants.FILE_INPROGRESS_STATUS) >
-1) {
-          return true;
-        }
-        return false;
-      }
-    });
-
-    String badRecordsInProgressFileName = null;
-    String changedFileName = null;
-    for (CarbonFile badFiles : listFiles) {
-      badRecordsInProgressFileName = badFiles.getName();
-
-      changedFileName = badLogStoreLocation + File.separator + badRecordsInProgressFileName
-          .substring(0, badRecordsInProgressFileName.lastIndexOf('.'));
-
-      badFiles.renameTo(changedFileName);
-
-      if (badFiles.exists()) {
-        if (!badFiles.delete()) {
-          LOGGER.error("Unable to delete File : " + badFiles.getName());
-        }
-      }
-    }
-  }
-
-  /**
    * This method will be used to delete sort temp location is it is exites
    */
   public static void deleteSortLocationIfExists(String[] locations) {


Mime
View raw message