carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [33/56] [abbrv] carbondata git commit: fix NPE for partition loading
Date Tue, 20 Jun 2017 07:29:39 GMT
fix NPE for partition loading


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e21debe7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e21debe7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e21debe7

Branch: refs/heads/streaming_ingest
Commit: e21debe79ded0e6e166b6182278e17148e2e8f5d
Parents: 9a11440
Author: QiangCai <david.caiq@gmail.com>
Authored: Fri Jun 9 17:14:23 2017 +0800
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Thu Jun 15 19:19:35 2017 +0530

----------------------------------------------------------------------
 .../resources/data_partition_badrecords.csv     | 12 ++++++++
 .../TestDataLoadingForPartitionTable.scala      | 13 +++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 32 +++++++++++++++-----
 .../spark/rdd/CarbonDataRDDFactory.scala        | 32 +++++++++++++++-----
 4 files changed, 74 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e21debe7/integration/spark-common-test/src/test/resources/data_partition_badrecords.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/data_partition_badrecords.csv
b/integration/spark-common-test/src/test/resources/data_partition_badrecords.csv
new file mode 100644
index 0000000..a6dc7f9
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/data_partition_badrecords.csv
@@ -0,0 +1,12 @@
+intField1, stringField1, intField2
+
+,
+,,
+1,
+2,b
+3,c,13
+4,d,14,d
+5,e,
+6,f, ,16
+7,g,g
+8,h,h,18
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e21debe7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 8a35558..b5858b4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -18,13 +18,13 @@ package org.apache.carbondata.spark.testsuite.partition
 
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.spark.sql.Row
 
 class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll {
 
@@ -281,6 +281,16 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
   }
 
 
+  test("badrecords on partition column") {
+    sql("create table badrecordsPartition(intField1 int, stringField1 string) partitioned
by (intField2 int) stored by 'carbondata' tblproperties('partition_type'='hash', 'num_partitions'='5')")
+    sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table
badrecordsPartition options('bad_records_action'='force')")
+
+    checkAnswer(sql("select count(*) cnt from badrecordsPartition where intField2 = 13"),
Seq(Row(1)))
+    checkAnswer(sql("select count(*) cnt from badrecordsPartition where intField2 = 14"),
Seq(Row(1)))
+    checkAnswer(sql("select count(*) cnt from badrecordsPartition where intField2 is null"),
Seq(Row(9)))
+    checkAnswer(sql("select count(*) cnt from badrecordsPartition where intField2 is not
null"), Seq(Row(2)))
+  }
+
   override def afterAll = {
     dropTable
     if (defaultTimestampFormat == null) {
@@ -306,6 +316,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
     sql("drop table if exists multiInserts")
     sql("drop table if exists loadAndInsert")
     sql("drop table if exists listTableUpper")
+    sql("drop table if exists badrecordsPartition")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e21debe7/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index de0fd5a..3dcf8af 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1001,9 +1001,12 @@ object CarbonDataRDDFactory {
     val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
     val columns = carbonLoadModel.getCsvHeaderColumns
     var partitionColumnIndex = -1
-    for (i <- 0 until columns.length) {
-      if (partitionColumn.equalsIgnoreCase(columns(i))) {
-        partitionColumnIndex = i
+    breakable {
+      for (i <- 0 until columns.length) {
+        if (partitionColumn.equalsIgnoreCase(columns(i))) {
+          partitionColumnIndex = i
+          break
+        }
       }
     }
     if (partitionColumnIndex == -1) {
@@ -1023,8 +1026,13 @@ object CarbonDataRDDFactory {
       val serializationNullFormat =
         carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
       dataFrame.get.rdd.map { row =>
-        (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
-          delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+        if (null != row && row.length > partitionColumnIndex &&
+          null != row.get(partitionColumnIndex)) {
+          (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
+            delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+        } else {
+          (null, row)
+        }
       }
     } else {
       // input data from csv files
@@ -1039,8 +1047,18 @@ object CarbonDataRDDFactory {
         classOf[StringArrayWritable],
         hadoopConfiguration
       ).map { currentRow =>
-        val row = new StringArrayRow(new Array[String](columnCount))
-        (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+        if (null == currentRow || null == currentRow._2) {
+          val row = new StringArrayRow(new Array[String](columnCount))
+          (null, row)
+        } else {
+          val row = new StringArrayRow(new Array[String](columnCount))
+          val values = currentRow._2.get()
+          if (values != null && values.length > partitionColumnIndex) {
+            (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+          } else {
+            (null, row.setValues(currentRow._2.get()))
+          }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e21debe7/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8871f3b..271b56b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1025,9 +1025,12 @@ object CarbonDataRDDFactory {
     val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
     val columns = carbonLoadModel.getCsvHeaderColumns
     var partitionColumnIndex = -1
-    for (i <- 0 until columns.length) {
-      if (partitionColumn.equalsIgnoreCase(columns(i))) {
-        partitionColumnIndex = i
+    breakable {
+      for (i <- 0 until columns.length) {
+        if (partitionColumn.equalsIgnoreCase(columns(i))) {
+          partitionColumnIndex = i
+          break
+        }
       }
     }
     if (partitionColumnIndex == -1) {
@@ -1047,8 +1050,13 @@ object CarbonDataRDDFactory {
       val serializationNullFormat =
         carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
       dataFrame.get.rdd.map { row =>
-        (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
-          delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+        if (null != row && row.length > partitionColumnIndex &&
+          null != row.get(partitionColumnIndex)) {
+          (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
+            delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+        } else {
+          (null, row)
+        }
       }
     } else {
       // input data from csv files
@@ -1063,8 +1071,18 @@ object CarbonDataRDDFactory {
         classOf[StringArrayWritable],
         hadoopConfiguration
       ).map { currentRow =>
-        val row = new StringArrayRow(new Array[String](columnCount))
-        (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+        if (null == currentRow || null == currentRow._2) {
+          val row = new StringArrayRow(new Array[String](columnCount))
+          (null, row)
+        } else {
+          val row = new StringArrayRow(new Array[String](columnCount))
+          val values = currentRow._2.get()
+          if (values != null && values.length > partitionColumnIndex) {
+            (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+          } else {
+            (null, row.setValues(currentRow._2.get()))
+          }
+        }
       }
     }
 


Mime
View raw message