carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: Fixed exists query in carbon2.1
Date Sat, 18 Mar 2017 08:01:52 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 41ccba115 -> 6c17fad71


Fixed exists query in carbon2.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/248c5bb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/248c5bb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/248c5bb6

Branch: refs/heads/master
Commit: 248c5bb640c55c56982148c2435fc9026068287d
Parents: 41ccba1
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Sat Mar 18 12:24:03 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Sat Mar 18 13:20:07 2017 +0530

----------------------------------------------------------------------
 .../AbstractDetailQueryResultIterator.java      |  6 +++---
 .../CarbonDecoderOptimizerHelper.scala          |  6 +++++-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  4 +++-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  3 ++-
 .../spark/sql/hive/CarbonSessionState.scala     | 12 +++++++++--
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 18 +++++++++++++----
 .../carbondata/CarbonDataSourceSuite.scala      | 21 ++++++++++++++++++++
 7 files changed, 58 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/248c5bb6/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index a3d2fd2..4e5681c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -180,14 +180,14 @@ public abstract class AbstractDetailQueryResultIterator<E> extends
CarbonIterato
   }
 
   @Override public void close() {
+    if (null != dataBlockIterator) {
+      dataBlockIterator.close();
+    }
     try {
       fileReader.finish();
     } catch (IOException e) {
       LOGGER.error(e);
     }
-    if (null != dataBlockIterator) {
-      dataBlockIterator.close();
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/248c5bb6/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index 0b390c4..021e866 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -148,7 +148,11 @@ class CarbonPlanMarker {
         }
       }
     }
-    markerStack.peek().set
+    if (!markerStack.empty()) {
+      markerStack.peek().set
+    } else {
+      new util.HashSet[AttributeReferenceWrapper]()
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/248c5bb6/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 1491cff..2743e7e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.LoadTableByInsert
@@ -40,7 +42,7 @@ case class CarbonDatasourceHadoopRelation(
     paths: Array[String],
     parameters: Map[String, String],
     tableSchema: Option[StructType],
-    var isSubquery: Boolean = false)
+    isSubquery: ArrayBuffer[Boolean] = new ArrayBuffer[Boolean]())
   extends BaseRelation with InsertableRelation {
 
   lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/248c5bb6/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 3916b1c..543da6f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -168,7 +168,8 @@ case class CarbonDictionaryDecoder(
             s"""
              |boolean $isNull = false;
              |byte[] $valueIntern = $dictsRef.getDictionaryValueForKeyInBytes(${ ev.value
});
-             |if (java.util.Arrays.equals(org.apache.carbondata.core.constants
+             |if ($valueIntern == null ||
+             |  java.util.Arrays.equals(org.apache.carbondata.core.constants
              |.CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, $valueIntern)) {
              |  $isNull = true;
              |  $valueIntern = org.apache.carbondata.core.constants

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/248c5bb6/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 331efe9..d81fc09 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.expressions.ScalarSubquery
+import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
@@ -60,10 +60,18 @@ class CarbonOptimizer(
             val tPlan = s.plan.transform {
               case lr: LogicalRelation
                 if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery = true
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
                 lr
             }
             ScalarSubquery(tPlan, s.children, s.exprId)
+          case p: PredicateSubquery =>
+            val tPlan = p.plan.transform {
+              case lr: LogicalRelation
+                if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+                lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+                lr
+            }
+            PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
         }
     }
     super.execute(transFormedPlan)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/248c5bb6/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 8c38ec0..502b96a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -59,9 +59,9 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
     relations = collectCarbonRelation(plan)
     if (relations.nonEmpty && !isOptimized(plan)) {
       // In case scalar subquery skip the transformation and update the flag.
-      if (relations.exists(_.carbonRelation.isSubquery)) {
-        relations.foreach(p => p.carbonRelation.isSubquery = false)
-        LOGGER.info("Skip CarbonOptimizer for scalar sub query")
+      if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) {
+        relations.foreach(p => p.carbonRelation.isSubquery.remove(0))
+        LOGGER.info("Skip CarbonOptimizer for scalar/predicate sub query")
         return plan
       }
       LOGGER.info("Starting to optimize plan")
@@ -523,10 +523,20 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
         }
         Aggregate(grpExps, aggExps, agg.child)
       case expand: Expand =>
-        expand.transformExpressions {
+        val ex = expand.transformExpressions {
           case attr: AttributeReference =>
             updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
         }
+        // Update the datatype of literal type as per the output type, otherwise codegen
fails.
+        val updatedProj = ex.projections.map { projs =>
+          projs.zipWithIndex.map { case(p, index) =>
+            p.transform {
+              case l: Literal if l.dataType != ex.output(index).dataType =>
+                Literal(l.value, ex.output(index).dataType)
+            }
+          }
+        }
+        Expand(updatedProj, ex.output, ex.child)
       case filter: Filter =>
         filter
       case j: Join =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/248c5bb6/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 5cd6ea0..b32a49d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -137,4 +137,25 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll
{
       .addProperty("carbon.blockletgroup.size.in.mb", "64")
   }
 
+  test("exists function verification test")({
+    sql("drop table if exists carbonunion")
+    sql("drop table if exists sparkunion")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 1000).map(x => (x+"", (x+100)+"")).toDF("c1",
"c2")
+    df.registerTempTable("sparkunion")
+    df.write
+      .format("carbondata")
+      .mode(SaveMode.Overwrite)
+      .option("tableName", "carbonunion")
+      .save()
+    checkAnswer(
+      sql("select * from carbonunion where c1='200' and exists(select * from carbonunion)"),
+      sql("select * from sparkunion where c1='200' and exists(select * from sparkunion)"))
+    checkAnswer(
+      sql("select * from carbonunion where c1='200' and exists(select * from carbonunion)
and exists(select * from carbonunion)"),
+      sql("select * from sparkunion where c1='200' and exists(select * from sparkunion) and
exists(select * from sparkunion)"))
+    sql("drop table if exists sparkunion")
+    sql("drop table if exists carbonunion")
+  })
+
 }


Mime
View raw message