carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: [CARBONDATA-965] dataload fail message is not correct when there is no good data to load
Date Fri, 21 Apr 2017 07:30:45 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master f59f5ae81 -> 674b71e46


[CARBONDATA-965] dataload fail message is not correct when there is no good data to load


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9ffe1775
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9ffe1775
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9ffe1775

Branch: refs/heads/master
Commit: 9ffe1775ca8bca430cd5c4b9723d4322e2baefaa
Parents: f59f5ae
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Thu Apr 20 19:09:46 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri Apr 21 12:59:48 2017 +0530

----------------------------------------------------------------------
 .../src/test/resources/badrecords/dummy2.csv    |  2 +
 .../carbondata/spark/load/CarbonLoaderUtil.java | 42 ++++++++++++++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        | 11 +++++
 .../DataLoadFailAllTypeSortTest.scala           | 36 ++++++++++++++++-
 .../AlterTableValidationTestCase.scala          |  2 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  5 ++-
 6 files changed, 94 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark-common-test/src/test/resources/badrecords/dummy2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/badrecords/dummy2.csv b/integration/spark-common-test/src/test/resources/badrecords/dummy2.csv
new file mode 100644
index 0000000..a28b362
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/badrecords/dummy2.csv
@@ -0,0 +1,2 @@
+name,dob,weight
+"","",""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 4806a93..964c536 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -100,6 +100,48 @@ public final class CarbonLoaderUtil {
     }
   }
 
+  /**
+   * the method returns true if the segment has carbondata file else returns false.
+   *
+   * @param loadModel
+   * @param currentLoad
+   * @return
+   */
+  public static boolean isValidSegment(CarbonLoadModel loadModel,
+      int currentLoad) {
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
+        .getCarbonTable();
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+        loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+    int fileCount = 0;
+    int partitionCount = carbonTable.getPartitionCount();
+    for (int i = 0; i < partitionCount; i++) {
+      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "",
+          currentLoad + "");
+      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+          FileFactory.getFileType(segmentPath));
+      CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+        @Override
+        public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(
+              CarbonTablePath.getCarbonIndexExtension())
+              || file.getName().endsWith(
+              CarbonTablePath.getCarbonDataExtension());
+        }
+
+      });
+      fileCount += files.length;
+      if (files.length > 0) {
+        return true;
+      }
+    }
+    if (fileCount == 0) {
+      return false;
+    }
+    return true;
+  }
   public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
       final boolean isCompactionFlow) throws IOException {
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 631b2a7..4656c2e 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -810,6 +810,17 @@ object CarbonDataRDDFactory {
         shutdownDictionaryServer(carbonLoadModel, result, false)
         throw new Exception(errorMessage)
       } else {
+        // if segment is empty then fail the data load
+        if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, currentLoadCount)) {
+          CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+          LOGGER.info("********clean up done**********")
+          LOGGER.audit(s"Data load is failed for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}" +
+                       " as there is no data to load")
+          LOGGER.warn("Cannot write load metadata file as data load failed")
+          shutdownDictionaryServer(carbonLoadModel, result, false)
+          throw new Exception("No Data to load")
+        }
         val metadataDetails = status(0)._2
         if (!isAgg) {
           val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index 478b4d3..0465aa7 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -40,6 +40,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll
{
     sql("drop table IF EXISTS data_bm")
     sql("drop table IF EXISTS data_bmf")
     sql("drop table IF EXISTS data_tbm")
+    sql("drop table IF EXISTS data_bm_no_good_data")
   }
 
   test("dataload with parallel merge with bad_records_action='FAIL'") {
@@ -122,8 +123,6 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll
{
           "STORED BY 'org.apache.carbondata.format'")
       val testData = s"$resourcesPath/badrecords/dummy.csv"
       sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bm""")
-
-
     } catch {
       case x: Throwable => {
         assert(x.getMessage.contains("Data load failed due to bad record"))
@@ -174,6 +173,38 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll
{
     }
   }
 
+  test("dataload with LOAD_USE_BATCH_SORT='true' with bad_records_action='REDIRECT'") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+      sql("create table data_bm_no_good_data(name String, dob long, weight int) " +
+          "STORED BY 'org.apache.carbondata.format'")
+      val testData = s"$resourcesPath/badrecords/dummy2.csv"
+      sql(
+        s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bm_no_good_data options
+           ('IS_EMPTY_DATA_BAD_RECORD'='true','BAD_RECORDS_ACTION'='REDIRECT')""")
+    } catch {
+      case x: Throwable => {
+        assert(x.getMessage.contains("No Data to load"))
+        CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+      }
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+    }
+  }
+
   test("dataload with table bucketing with bad_records_action='FAIL'") {
     try {
       CarbonProperties.getInstance()
@@ -212,6 +243,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll
{
     sql("drop table IF EXISTS data_bm")
     sql("drop table IF EXISTS data_bmf")
     sql("drop table IF EXISTS data_tbm")
+    sql("drop table IF EXISTS data_bm_no_good_data")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 93b57c2..90a88f6 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -67,7 +67,7 @@ class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll
{
     s"""LOAD DATA LOCAL INPATH '$resourcesPath/badrecords/datasample.csv' INTO TABLE
          |restructure_bad OPTIONS
          |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'bad_records_logger_enable'='true',
-         |'bad_records_action'='redirect')"""
+         |'bad_records_action'='force')"""
       .stripMargin)
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 10c5191..95a337a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -116,7 +116,10 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
     try {
       File[] filesToMergeSort = getFilesToMergeSort();
       this.fileCounter = rowPages.length + filesToMergeSort.length + merges.size();
-
+      if (fileCounter == 0) {
+        LOGGER.info("No files to merge sort");
+        return;
+      }
       LOGGER.info("Number of row pages: " + this.fileCounter);
 
       // create record holder heap


Mime
View raw message