carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [09/42] carbondata git commit: Spark 2x tupleId support for IUD Feature
Date Thu, 15 Jun 2017 11:50:14 GMT
Spark 2x tupleId support for IUD Feature


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9e913e00
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9e913e00
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9e913e00

Branch: refs/heads/branch-1.1
Commit: 9e913e006ff06b6e16e92e0bb26504ab49f44cdc
Parents: 49e8b00
Author: nareshpr <prnaresh.naresh@gmail.com>
Authored: Mon May 15 13:10:08 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Jun 15 12:45:15 2017 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  1 +
 .../execution/CarbonLateDecodeStrategy.scala    | 16 +++++++--
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 36 ++++++++++++++++++--
 .../carbondata/query/SubQueryTestSuite.scala    |  4 +++
 4 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e913e00/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index a286e56..b46488c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -43,6 +43,7 @@ class CarbonEnv {
   var initialized = false
 
   def init(sparkSession: SparkSession): Unit = {
+    sparkSession.udf.register("getTupleId", () => "")
     if (!initialized) {
       carbonMetastore = {
         val storePath =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e913e00/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 346e105..ac43a12 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -239,9 +239,20 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
       filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
     } else {
+
+      var newProjectList: Seq[Attribute] = Seq.empty
+      val updatedProjects = projects.map {
+          case a@Alias(s: ScalaUDF, name)
+            if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+                name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
=>
+            val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
+            newProjectList :+= reference
+            reference
+          case other => other
+      }
       // Don't request columns that are only referenced by pushed filters.
       val requestedColumns =
-      (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
+      (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
       val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
       val scan = getDataSourceScan(relation,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]],
@@ -252,7 +263,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
       execution.ProjectExec(
-        updateRequestedColumnsFunc(projects, table, needDecoder).asInstanceOf[Seq[NamedExpression]],
+        updateRequestedColumnsFunc(updatedProjects, table,
+          needDecoder).asInstanceOf[Seq[NamedExpression]],
         filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e913e00/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index aff34ea..fd6f14e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.optimizer
 import java.util
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql._
@@ -30,9 +29,10 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
 import org.apache.carbondata.spark.{CarbonAliasDecoderRelation, CarbonFilters}
@@ -69,9 +69,10 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
         return plan
       }
       LOGGER.info("Starting to optimize plan")
+      val udfTransformedPlan = pushDownUDFToJoinLeftRelation(plan)
       val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
       val queryStatistic = new QueryStatistic()
-      val result = transformCarbonPlan(plan, relations)
+      val result = transformCarbonPlan(udfTransformedPlan, relations)
       queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
         System.currentTimeMillis)
       recorder.recordStatistics(queryStatistic)
@@ -83,6 +84,35 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
     }
   }
 
+  private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
+    val output = plan match {
+      case proj@Project(cols, Join(
+      left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
+        var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
+        val newCols = cols.map { col =>
+          col match {
+            case a@Alias(s: ScalaUDF, name)
+              if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+                  name.equalsIgnoreCase(
+                    CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
+              projectionToBeAdded :+= a
+              AttributeReference(name, StringType, true)().withExprId(a.exprId)
+            case other => other
+          }
+        }
+        val newLeft = left match {
+          case Project(columns, logicalPlan) =>
+            Project(columns ++ projectionToBeAdded, logicalPlan)
+          case filter: Filter =>
+            Project(filter.output ++ projectionToBeAdded, filter)
+          case other => other
+        }
+        Project(newCols, Join(newLeft, right, jointype, condition))
+      case other => other
+    }
+    output
+  }
+
   def isOptimized(plan: LogicalPlan): Boolean = {
     plan find {
       case cd: CarbonDictionaryCatalystDecoder => true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e913e00/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
index f6ad961..ad56173 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
@@ -56,6 +56,10 @@ class SubQueryTestSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists anothertable")
   }
 
+  test("tupleId") {
+    checkExistence(sql("select getTupleId() as tupleId from subquery"), true, "0/0/0-0_batchno0-0-")
+  }
+
   override def afterAll() {
     sql("drop table if exists subquery")
   }


Mime
View raw message