carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject carbondata git commit: [CARBONDATA-2606][Complex DataType Enhancements] Fixed Projection Pushdown when Select filter contains Struct column.
Date Fri, 06 Jul 2018 11:19:49 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 806e9b5a8 -> c82e3e85f


[CARBONDATA-2606][Complex DataType Enhancements] Fixed Projection Pushdown when
Select filter contains Struct column.

Problem:
If Select filter contains Struct Column which is not in Projection list,
then only null value is stored for struct column given in filter and select query result is
null.
Solution:
Pushdown Parent column of corresponding struct type if any struct column is present in Filter
list.

This closes #2439


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c82e3e85
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c82e3e85
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c82e3e85

Branch: refs/heads/master
Commit: c82e3e85f95f8c5af8a6ff92c980235435f79954
Parents: 806e9b5
Author: Indhumathi27 <indhumathim27@gmail.com>
Authored: Tue Jul 3 16:40:55 2018 +0530
Committer: kunal642 <kunalkapoor642@gmail.com>
Committed: Fri Jul 6 16:48:01 2018 +0530

----------------------------------------------------------------------
 .../complexType/TestComplexDataType.scala       |  21 ++-
 .../sql/CarbonDatasourceHadoopRelation.scala    | 137 ++++++++++---------
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 3 files changed, 90 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c82e3e85/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index ab574c9..276ed30 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -620,16 +620,14 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
           Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5))))
   }
 
-  test("ArrayofArray PushDown")
-  {
+  test("ArrayofArray PushDown") {
     sql("DROP TABLE IF EXISTS test")
     sql("create table test(a array<array<int>>) stored by 'carbondata'")
     sql("insert into test values(1) ")
     sql("select a[0][0] from test").show(false)
   }
 
-  test("Struct and ArrayofArray PushDown")
-  {
+  test("Struct and ArrayofArray PushDown") {
     sql("DROP TABLE IF EXISTS test")
     sql("create table test(a array<array<int>>,b struct<c:array<int>>)
stored by 'carbondata'")
     sql("insert into test values(1,1) ")
@@ -645,7 +643,7 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
       "sno:array<int>,sal:array<double>,state:array<string>,date1:array<timestamp>>)
stored by " +
       "'carbondata'")
     sql("insert into test values('cus_01','1$2017/01/01$1:2$2.0:3.0$ab:ac$2018/01/01')")
-//    sql("select *from test").show(false)
+    //    sql("select *from test").show(false)
     sql(
       "select struct_of_array.state[0],count(distinct struct_of_array.id) as count_int,count"
+
       "(distinct struct_of_array.state[0]) as count_string from test group by struct_of_array"
+
@@ -656,4 +654,17 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
   }
 
+  test("test struct complex type with filter") {
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(id int,a struct<b:int,c:int>) stored by 'carbondata'")
+    sql("insert into test values(1,'2$3')")
+    sql("insert into test values(3,'5$3')")
+    sql("insert into test values(2,'4$5')")
+    checkAnswer(sql("select a.b from test where id=3"),Seq(Row(5)))
+    checkAnswer(sql("select a.b from test where a.c!=3"),Seq(Row(4)))
+    checkAnswer(sql("select a.b from test where a.c=3"),Seq(Row(5),Row(2)))
+    checkAnswer(sql("select a.b from test where id=1 or !a.c=3"),Seq(Row(4),Row(2)))
+    checkAnswer(sql("select a.b from test where id=3 or a.c=3"),Seq(Row(5),Row(2)))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c82e3e85/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 86e9ace..fa57960 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -69,6 +69,7 @@ case class CarbonDatasourceHadoopRelation(
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
 
   def buildScan(requiredColumns: Array[String],
+      filterComplex: Seq[org.apache.spark.sql.catalyst.expressions.Expression],
       projects: Seq[NamedExpression],
       filters: Array[Filter],
       partitions: Seq[PartitionSpec]): RDD[InternalRow] = {
@@ -76,77 +77,85 @@ case class CarbonDatasourceHadoopRelation(
       CarbonFilters.createCarbonFilter(schema, filter)
     }.reduceOption(new AndExpression(_, _))
 
-    var parentColumn = new ListBuffer[String]
-    // In case of Struct or StructofStruct Complex type, get the project column for given
-    // parent/child field and pushdown the corresponding project column. In case of Array,
-    // ArrayofStruct or StructofArray, pushdown parent column
-    var reqColumns = projects.map {
-      case a@Alias(s: GetStructField, name) =>
-        var arrayTypeExists = false
-        var ifGetArrayItemExists = s
-        breakable({
-          while (ifGetArrayItemExists.containsChild != null) {
-            if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
-              arrayTypeExists = s.childSchema.toString().contains("ArrayType")
-              break
-            } else {
-              if (ifGetArrayItemExists.child.isInstanceOf[GetArrayItem]) {
-                arrayTypeExists = true
+    val projection = new CarbonProjection
+
+    // As Filter pushdown for Complex datatype is not supported, if filter is applied on
complex
+    // column, then Projection pushdown on Complex Columns will not take effect. Hence, check
if
+    // filter contains Struct Complex Column.
+    val complexFilterExists = filterComplex.map(col =>
+      col.map(_.isInstanceOf[GetStructField]))
+
+    if (!complexFilterExists.exists(f => f.contains(true))) {
+      var parentColumn = new ListBuffer[String]
+      // In case of Struct or StructofStruct Complex type, get the project column for given
+      // parent/child field and pushdown the corresponding project column. In case of Array,
+      // ArrayofStruct or StructofArray, pushdown parent column
+      var reqColumns = projects.map {
+        case a@Alias(s: GetStructField, name) =>
+          var arrayTypeExists = false
+          var ifGetArrayItemExists = s
+          breakable({
+            while (ifGetArrayItemExists.containsChild != null) {
+              if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
+                arrayTypeExists = s.childSchema.toString().contains("ArrayType")
                 break
-              }
-              else {
-                ifGetArrayItemExists = ifGetArrayItemExists.child.asInstanceOf[GetStructField]
+              } else {
+                if (ifGetArrayItemExists.child.isInstanceOf[GetArrayItem]) {
+                  arrayTypeExists = true
+                  break
+                } else {
+                  ifGetArrayItemExists = ifGetArrayItemExists.child.asInstanceOf[GetStructField]
+                }
               }
             }
+          })
+          if (!arrayTypeExists) {
+            parentColumn += s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
+            parentColumn = parentColumn.distinct
+            s.toString().replaceAll("#[0-9]*", "").toLowerCase
+          } else {
+            s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
           }
-        })
-        if (!arrayTypeExists) {
-          parentColumn += s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
-          parentColumn = parentColumn.distinct
-          s.toString().replaceAll("#[0-9]*", "").toLowerCase
-        } else {
+        case a@Alias(s: GetArrayItem, name) =>
           s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
-        }
-      case a@Alias(s: GetArrayItem, name) =>
-        s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
-      case attributeReference: AttributeReference =>
-        var columnName: String = attributeReference.name
-        requiredColumns.foreach(colName =>
-          if (colName.equalsIgnoreCase(attributeReference.name)) {
-            columnName = colName
-          })
-        columnName
-      case other =>
-        None
-    }
-
-    reqColumns = reqColumns.filter(col => !col.equals(None))
-    var output = new ListBuffer[String]
-
-    if (null != requiredColumns && requiredColumns.nonEmpty) {
-      requiredColumns.foreach(col => {
-
-        if (null != reqColumns && reqColumns.nonEmpty) {
-          reqColumns.foreach(reqCol => {
-            if (!reqCol.toString.equalsIgnoreCase(col) &&
-                !reqCol.toString.startsWith(col.toLowerCase + ".") &&
-                !parentColumn.contains(col.toLowerCase)) {
-              output += col
-            } else {
-              output += reqCol.toString
-            }
-          })
-        } else {
-          output += col
-        }
-        output = output.distinct
-      })
+        case attributeReference: AttributeReference =>
+          var columnName: String = attributeReference.name
+          requiredColumns.foreach(colName =>
+            if (colName.equalsIgnoreCase(attributeReference.name)) {
+              columnName = colName
+            })
+          columnName
+        case other =>
+          None
+      }
+
+      reqColumns = reqColumns.filter(col => !col.equals(None))
+      var output = new ListBuffer[String]
+
+      if (null != requiredColumns && requiredColumns.nonEmpty) {
+        requiredColumns.foreach(col => {
+
+          if (null != reqColumns && reqColumns.nonEmpty) {
+            reqColumns.foreach(reqCol => {
+              if (!reqCol.toString.equalsIgnoreCase(col) &&
+                  !reqCol.toString.startsWith(col.toLowerCase + ".") &&
+                  !parentColumn.contains(col.toLowerCase)) {
+                output += col
+              } else {
+                output += reqCol.toString
+              }
+            })
+          } else {
+            output += col
+          }
+          output = output.distinct
+        })
+      }
+      output.toArray.foreach(projection.addColumn)
+    } else {
+      requiredColumns.foreach(projection.addColumn)
     }
 
-    val projection = new CarbonProjection
-    output.toArray.foreach(projection.addColumn)
-
-
     CarbonSession.threadUnset(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)
     val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
     new CarbonScanRDD(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c82e3e85/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 3926ff6..10ae86a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -64,7 +64,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           projects,
           filters,
           (a, f, needDecoder, p) => toCatalystRDD(l, a, relation.buildScan(
-            a.map(_.name).toArray, projects, f, p), needDecoder)) :: Nil
+            a.map(_.name).toArray, filters, projects, f, p), needDecoder)) :: Nil
       case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
         if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
             !CarbonDictionaryDecoder.


Mime
View raw message