carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [08/12] carbondata git commit: [CARBONDATA-2242] Add Materialized View modules
Date Fri, 09 Mar 2018 08:44:26 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/pom.xml b/datamap/mv/plan/pom.xml
new file mode 100644
index 0000000..b40915a
--- /dev/null
+++ b/datamap/mv/plan/pom.xml
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-mv-plan</artifactId>
+  <name>Apache CarbonData :: Materialized View Plan</name>
+
+  <properties>
+    <dev.path>${basedir}/../../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <!--<resources>
+      <resource>
+        <directory>src/resources</directory>
+      </resource>
+      <resource>
+        <directory>.</directory>
+      </resource>
+    </resources>-->
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.18</version>
+        <!-- Note config is repeated in scalatest config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/dsl/package.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/dsl/package.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/dsl/package.scala
new file mode 100644
index 0000000..20b5e8a
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/dsl/package.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv
+
+import scala.language.implicitConversions
+
+import org.apache.spark.sql.catalyst._
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.modular.{JoinEdge, ModularPlan}
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util._
+
+/**
+ * A collection of implicit conversions that create a DSL for constructing data structures
+ * for modular plans.
+ *
+ */
+package object dsl {
+
+  // object plans {
+
+    implicit class DslModularPlan(val modularPlan: ModularPlan) {
+      def select(outputExprs: NamedExpression*)
+        (inputExprs: Expression*)
+        (predicateExprs: Expression*)
+        (aliasMap: Map[Int, String])
+        (joinEdges: JoinEdge*): ModularPlan = {
+        modular
+          .Select(
+            outputExprs,
+            inputExprs,
+            predicateExprs,
+            aliasMap,
+            joinEdges,
+            Seq(modularPlan),
+            NoFlags,
+            Seq.empty,
+            Seq.empty)
+      }
+
+      def groupBy(outputExprs: NamedExpression*)
+        (inputExprs: Expression*)
+        (predicateExprs: Expression*): ModularPlan = {
+        modular
+          .GroupBy(outputExprs, inputExprs, predicateExprs, None, modularPlan, NoFlags, Seq.empty)
+      }
+
+      def harmonize: ModularPlan = modularPlan.harmonized
+    }
+
+    implicit class DslModularPlans(val modularPlans: Seq[ModularPlan]) {
+      def select(outputExprs: NamedExpression*)
+        (inputExprs: Expression*)
+        (predicateList: Expression*)
+        (aliasMap: Map[Int, String])
+        (joinEdges: JoinEdge*): ModularPlan = {
+        modular
+          .Select(
+            outputExprs,
+            inputExprs,
+            predicateList,
+            aliasMap,
+            joinEdges,
+            modularPlans,
+            NoFlags,
+            Seq.empty,
+            Seq.empty)
+      }
+
+      def union(): ModularPlan = modular.Union(modularPlans, NoFlags, Seq.empty)
+    }
+
+    implicit class DslLogical2Modular(val logicalPlan: LogicalPlan) {
+      def resolveonly: LogicalPlan = analysis.SimpleAnalyzer.execute(logicalPlan)
+
+      def modularize: ModularPlan = modular.SimpleModularizer.modularize(logicalPlan).next
+
+      def optimize: LogicalPlan = BirdcageOptimizer.execute(logicalPlan)
+    }
+
+  // }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
new file mode 100644
index 0000000..cfe341a
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.expressions.modular
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, AttributeSet, Expression, ExprId, LeafExpression, NamedExpression, OuterReference, PlanExpression, Predicate, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * A base interface for expressions that contain a [[ModularPlan]].
+ */
+abstract class ModularSubquery(
+    plan: ModularPlan,
+    children: Seq[Expression],
+    exprId: ExprId) extends PlanExpression[ModularPlan] {
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+    if (plan.resolved) {
+      super.references -- plan.outputSet
+    } else {
+      super.references
+    }
+
+  override def withNewPlan(plan: ModularPlan): ModularSubquery
+
+  override def semanticEquals(o: Expression): Boolean = {
+    o match {
+      case p: ModularSubquery =>
+        this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) &&
+        children.length == p.children.length &&
+        children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+      case _ => false
+    }
+  }
+
+  def canonicalize(attrs: AttributeSeq): ModularSubquery = {
+    // Normalize the outer references in the subquery plan.
+    val normalizedPlan = plan.transformAllExpressions {
+      case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs))
+    }
+    withNewPlan(normalizedPlan).canonicalized.asInstanceOf[ModularSubquery]
+  }
+}
+
+/**
+ * A subquery that will return only one row and one column. This will be converted into a physical
+ * scalar subquery during planning.
+ *
+ * Note: `exprId` is used to have a unique name in explain string output.
+ */
+case class ScalarModularSubquery(
+    plan: ModularPlan,
+    children: Seq[Expression] = Seq.empty,
+    exprId: ExprId = NamedExpression.newExprId)
+  extends ModularSubquery(plan, children, exprId) with Unevaluable {
+  override def dataType: DataType = plan.schema.fields.head.dataType
+
+  override def nullable: Boolean = true
+
+  override def withNewPlan(plan: ModularPlan): ScalarModularSubquery = copy(plan = plan)
+
+  override def toString: String = s"scalar-modular-subquery#${ exprId.id } $conditionString"
+
+  override lazy val canonicalized: Expression = {
+    ScalarModularSubquery(
+      plan.canonicalized,
+      children.map(_.canonicalized),
+      ExprId(0))
+  }
+}
+
+object ScalarModularSubquery {
+  def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
+    e.find {
+      case s: ScalarModularSubquery => s.children.nonEmpty
+      case _ => false
+    }.isDefined
+  }
+}
+
+/**
+ * A [[ListQuery]] expression defines the query which we want to search in an IN subquery
+ * expression. It should and can only be used in conjunction with an IN expression.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT  *
+ *   FROM    a
+ *   WHERE   a.id IN (SELECT  id
+ *                    FROM    b)
+ * }}}
+ */
+case class ModularListQuery(
+    plan: ModularPlan,
+    children: Seq[Expression] = Seq.empty,
+    exprId: ExprId = NamedExpression.newExprId)
+  extends ModularSubquery(plan, children, exprId) with Unevaluable {
+  override def dataType: DataType = plan.schema.fields.head.dataType
+
+  override def nullable: Boolean = false
+
+  override def withNewPlan(plan: ModularPlan): ModularListQuery = copy(plan = plan)
+
+  override def toString: String = s"modular-list#${ exprId.id } $conditionString"
+
+  override lazy val canonicalized: Expression = {
+    ModularListQuery(
+      plan.canonicalized,
+      children.map(_.canonicalized),
+      ExprId(0))
+  }
+}
+
+/**
+ * The [[Exists]] expression checks if a row exists in a subquery given some correlated condition.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT  *
+ *   FROM    a
+ *   WHERE   EXISTS (SELECT  *
+ *                   FROM    b
+ *                   WHERE   b.id = a.id)
+ * }}}
+ */
+case class ModularExists(
+    plan: ModularPlan,
+    children: Seq[Expression] = Seq.empty,
+    exprId: ExprId = NamedExpression.newExprId)
+  extends ModularSubquery(plan, children, exprId) with Predicate with Unevaluable {
+  override def nullable: Boolean = false
+
+  override def withNewPlan(plan: ModularPlan): ModularExists = copy(plan = plan)
+
+  override def toString: String = s"modular-exists#${ exprId.id } $conditionString"
+
+  override lazy val canonicalized: Expression = {
+    ModularExists(
+      plan.canonicalized,
+      children.map(_.canonicalized),
+      ExprId(0))
+  }
+}
+
+/**
+ * A place holder for generated SQL for subquery expression.
+ */
+case class SubqueryHolder(override val sql: String) extends LeafExpression with Unevaluable {
+  override def dataType: DataType = NullType
+
+  override def nullable: Boolean = true
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
new file mode 100644
index 0000000..1864757
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import scala.collection._
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Divide, ExprId, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+
+trait AggregatePushDown { // self: ModularPlan =>
+
+  def findPushThroughAggregates(outputList: Seq[NamedExpression],
+      selAliasMap: AttributeMap[Attribute],
+      fact: ModularRelation): Map[Int, (NamedExpression, Seq[NamedExpression])] = {
+    var pushable = true
+    val map = scala.collection.mutable.Map[Int, (NamedExpression, Seq[NamedExpression])]()
+    outputList.zipWithIndex.foreach {
+      // TODO: find out if the first two case as follows really needed.  Comment out for now.
+      //      case (attr: Attribute, i) if (fact.outputSet.contains(attr)) => pushable = false
+      //      case (alias: Alias, i) if (alias.child.isInstanceOf[Attribute] && fact.outputSet
+      // .contains(alias.child.asInstanceOf[Attribute])) => pushable = false
+      case (alias: Alias, i) if (alias.child.isInstanceOf[AggregateExpression]) =>
+        val res = transformAggregate(
+          alias.child
+            .asInstanceOf[AggregateExpression],
+          selAliasMap,
+          i,
+          fact,
+          map,
+          Some((alias.name, alias.exprId)))
+        if (res.isEmpty) {
+          pushable = false
+        }
+      case (agg: AggregateExpression, i) =>
+        val res = transformAggregate(
+          agg,
+          selAliasMap,
+          i,
+          fact,
+          map,
+          None)
+        if (res.isEmpty) {
+          pushable = false
+        }
+      case _ =>
+    }
+    if (!pushable) {
+      Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+    } else {
+      map
+    }
+  }
+
+  private def transformAggregate(aggregate: AggregateExpression,
+      selAliasMap: AttributeMap[Attribute],
+      ith: Int,
+      fact: ModularRelation,
+      map: scala.collection.mutable.Map[Int, (NamedExpression, Seq[NamedExpression])],
+      aliasInfo: Option[(String, ExprId)]) = {
+    aggregate match {
+      case cnt@AggregateExpression(Count(exprs), _, false, _) if (exprs.length == 1 && exprs(0)
+        .isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(exprs(0).asInstanceOf[Attribute]).getOrElse(exprs(0))
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val cnt1 = AggregateExpression(Count(tAttr), cnt.mode, false)
+          val alias = Alias(cnt1, cnt1.toString)()
+          val tSum = AggregateExpression(Sum(alias.toAttribute), cnt.mode, false, cnt.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case cnt@AggregateExpression(Count(exprs), _, false, _) if (exprs.length == 1 && exprs(0)
+        .isInstanceOf[Literal]) =>
+        val cnt1 = AggregateExpression(Count(exprs(0)), cnt.mode, false)
+        val alias = Alias(cnt1, cnt1.toString)()
+        val tSum = AggregateExpression(Sum(alias.toAttribute), cnt.mode, false, cnt.resultId)
+        val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+        map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+      case sum@AggregateExpression(Sum(expr), _, false, _) if (expr.isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val sum1 = AggregateExpression(Sum(tAttr), sum.mode, false)
+          val alias = Alias(sum1, sum1.toString)()
+          val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case sum@AggregateExpression(Sum(expr), _, false, _) if (expr.isInstanceOf[Literal]) =>
+        val sum1 = AggregateExpression(Sum(expr), sum.mode, false)
+        val alias = Alias(sum1, sum1.toString)()
+        val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId)
+        val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+        map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+      case max@AggregateExpression(Max(expr), _, false, _) if (expr.isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val max1 = AggregateExpression(Sum(tAttr), max.mode, false)
+          val alias = Alias(max1, max1.toString)()
+          val tMax = AggregateExpression(Max(alias.toAttribute), max.mode, false, max.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tMax, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case min@AggregateExpression(Min(expr), _, false, _) if (expr.isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val min1 = AggregateExpression(Min(tAttr), min.mode, false)
+          val alias = Alias(min1, min1.toString)()
+          val tMin = AggregateExpression(Max(alias.toAttribute), min.mode, false, min.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tMin, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case avg@AggregateExpression(Average(expr), _, false, _) if (expr
+        .isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val savg = AggregateExpression(Sum(tAttr), avg.mode, false)
+          val cavg = AggregateExpression(Count(tAttr), avg.mode, false)
+          val sAvg = Alias(savg, savg.toString)()
+          val cAvg = Alias(cavg, cavg.toString)()
+          val tAvg = Divide(sAvg.toAttribute, cAvg.toAttribute)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tAvg, name)(exprId = id), Seq(sAvg, cAvg)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case _ => Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Flags.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Flags.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Flags.scala
new file mode 100644
index 0000000..2b9d8cf
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Flags.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+trait Flags {
+  // each query allows only one of each of the following keywords
+  final val DISTINCT = 1L << 0
+  final val LIMIT = 1L << 1
+  final val SORT = 1L << 2
+  final val GLOBAL = 1L << 3
+  final val LOCAL = 1L << 4
+  final val EXPAND = 1L << 5
+
+  // to determine each Seq[Expression] in varagrgs belong to which keyword
+  //  final val SortLimit = SORT | LIMIT
+
+  def flagToString(flag: Long): String = {
+    flag match {
+      case DISTINCT => "DISTINCT"
+      case LIMIT => "LIMIT"
+    }
+  }
+
+  // List of the raw flags that have expressions as arguments
+  // TODO: add EXPAND
+  private def pickledWithExpressions = {
+    Array[Long](SORT, LIMIT, EXPAND)
+  }
+
+  final val MaxBitPosition = 6
+
+  final val pickledListOrder: List[Long] = {
+    val all = 0 to MaxBitPosition map (1L << _)
+    all.toList filter (pickledWithExpressions contains _)
+  }
+  final val rawFlagPickledOrder: Array[Long] = pickledListOrder.toArray
+
+  type FlagSet = Long
+
+  val NoFlags: FlagSet = 0L
+
+  implicit class FlagSetUtils(var flags: FlagSet) {
+    def hasFlag(mask: Long): Boolean = (flags & mask) != 0L
+
+    def hasFlag(mask: Int): Boolean = hasFlag(mask.toLong)
+
+    def setFlag(mask: Long): FlagSet = { flags |= mask; flags }
+
+    def resetFlag(mask: Long): FlagSet = { flags &= ~mask; flags }
+
+    def initFlags(mask: Long): FlagSet = { flags = mask; flags }
+  }
+
+}
+
+object Flags extends Flags

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
new file mode 100644
index 0000000..0843b48
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.carbondata.mv.plans
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.modular.Flags._
+
+abstract class Harmonizer(conf: SQLConf)
+  extends RuleExecutor[ModularPlan] {
+
+  //  protected val fixedPoint = FixedPoint(conf.getConfString("spark.mv.harmonizer
+  // .maxIterations").toInt)
+  protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
+
+  def batches: Seq[Batch] = {
+    Batch(
+      "Data Harmonizations", fixedPoint,
+      HarmonizeDimensionTable,
+      HarmonizeFactTable) :: Nil
+  }
+}
+
+/**
+ * An default Harmonizer
+ */
+object DefaultHarmonizer extends DefaultHarmonizer
+
+class DefaultHarmonizer extends Harmonizer(new SQLConf())
+
+object HarmonizeDimensionTable extends Rule[ModularPlan] with PredicateHelper {
+
+  def apply(plan: ModularPlan): ModularPlan = {
+    plan transform {
+      case s@Select(_, _, _, _, jedges, fact :: dims, _, _, _) if
+      jedges.forall(e => e.joinType == LeftOuter || e.joinType == Inner) &&
+      fact.isInstanceOf[ModularRelation] &&
+      dims.filterNot(_.isInstanceOf[modular.LeafNode]).nonEmpty &&
+      dims.forall(d => (d.isInstanceOf[ModularRelation] || HarmonizedRelation.canHarmonize(d))) => {
+        var tPullUpPredicates = Seq.empty[Expression]
+        val tChildren = fact :: dims.map {
+          case m: ModularRelation => m
+          case h@GroupBy(
+          _,
+          _,
+          _,
+          _,
+          s1@Select(_, _, _, _, _, dim :: Nil, NoFlags, Nil, Nil),
+          NoFlags,
+          Nil) if (dim.isInstanceOf[ModularRelation]) => {
+            val rAliasMap = AttributeMap(h.outputList
+              .collect { case a: Alias => (a.child.asInstanceOf[Attribute], a.toAttribute) })
+            val pullUpPredicates = s1.predicateList
+              .map(replaceAlias(_, rAliasMap.asInstanceOf[AttributeMap[Expression]]))
+            if (pullUpPredicates.forall(cond => canEvaluate(cond, h))) {
+              tPullUpPredicates = tPullUpPredicates ++ pullUpPredicates
+              plans.modular.HarmonizedRelation(h.copy(child = s1.copy(predicateList = Nil)))
+            } else {
+              h
+            }
+          }
+          // case _ =>
+        }
+        if (tChildren.forall(_.isInstanceOf[modular.LeafNode])) {
+          s.copy(predicateList = s.predicateList ++ tPullUpPredicates, children = tChildren)
+        } else {
+          s
+        }
+      }
+      //        s.withNewChildren(fact :: dims.map { case m: modular.ModularRelation => m; case h
+      // => HarmonizedRelation(h) })}
+      //        s.copy(predicateList = predicateList ++ moveUpPredicates, children = tChildren)}
+      // fact :: dims.map { case m: modular.ModularRelation => m; case h => HarmonizedRelation(h)
+      // })}
+    }
+  }
+
+}
+
+object HarmonizeFactTable extends Rule[ModularPlan] with PredicateHelper with AggregatePushDown {
+
+  def apply(plan: ModularPlan): ModularPlan = {
+    plan transform {
+      case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasm, jedges, fact :: dims, _, _, _), _, _) if
+      s.adjacencyList.keySet.size <= 1 &&
+      jedges.forall(e => e.joinType == Inner) && // !s.flags.hasFlag(DISTINCT) &&
+      fact.isInstanceOf[ModularRelation] &&
+      (fact :: dims).forall(_.isInstanceOf[modular.LeafNode]) &&
+      dims.nonEmpty => {
+        val selAliasMap = AttributeMap(s.outputList.collect {
+          case a: Alias if (a.child.isInstanceOf[Attribute]) => (a.toAttribute, a.child
+            .asInstanceOf[Attribute])
+        })
+        val aggTransMap = findPushThroughAggregates(
+          g.outputList,
+          selAliasMap,
+          fact.asInstanceOf[ModularRelation])
+
+        val constraintsAttributeSet = dims.flatMap(s.extractEvaluableConditions(_))
+          .map(_.references)
+          .foldLeft(AttributeSet.empty)(_ ++ _)
+        val groupingAttributeSet = g.predicateList.map(_.references)
+          .foldLeft(AttributeSet.empty)(_ ++ _)
+        if (aggTransMap.isEmpty ||
+            // TODO: the following condition is too pessimistic, more work needed using methods
+            // similar to those in trait
+            //      QueryPlanConstraints
+            !constraintsAttributeSet.subsetOf(groupingAttributeSet)) {
+          g
+        } else {
+          val starJExprs = dims.flatMap(dim => s.extractJoinConditions(fact, dim)).toSeq
+          val gJAttributes = starJExprs.map(expr => expr.references)
+            .foldLeft(AttributeSet.empty)(_ ++ _).filter(fact.outputSet.contains(_))
+          val fExprs = s.extractEvaluableConditions(fact)
+          val gFAttributes = fExprs.map(expr => expr.references)
+            .foldLeft(AttributeSet.empty)(_ ++ _)
+            .filter(fact.outputSet.contains(_))
+          val gGAttributes = g.predicateList.map(expr => expr.references)
+            .foldLeft(AttributeSet.empty)(_ ++ _).filter(fact.outputSet.contains(_))
+          val gAttributes = (gJAttributes ++ gFAttributes ++ gGAttributes).toSeq
+
+          val oAggregates = aggTransMap.map(_._2).flatMap(_._2).toSeq
+
+          val tAliasMap = (aliasm.get(0) match {
+            case Some(name) => Seq((0, name));
+            case _ => Seq.empty
+          }).toMap
+          val sOutput = (oAggregates.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _) ++
+                         AttributeSet(gAttributes)).toSeq
+          val hFactSel = plans.modular
+            .Select(
+              sOutput,
+              fact.output,
+              Seq.empty,
+              tAliasMap,
+              Seq.empty,
+              fact :: Nil,
+              NoFlags,
+              Seq.empty,
+              Seq.empty)
+          val hFact = plans.modular
+            .GroupBy(
+              gAttributes ++ oAggregates,
+              sOutput,
+              gAttributes,
+              None,
+              hFactSel,
+              NoFlags,
+              Seq.empty)
+          val hFactName = s"gen_harmonized_${
+            fact.asInstanceOf[ModularRelation]
+              .databaseName
+          }_${ fact.asInstanceOf[ModularRelation].tableName }"
+          val hAliasMap = (aliasm - 0) + (0 -> hFactName)
+          val hInputList = gAttributes ++ oAggregates.map(_.toAttribute) ++
+                           dims.flatMap(_.asInstanceOf[modular.LeafNode].output).toSeq
+          // val hPredicateList = s.predicateList
+          val attrOutputList = s.outputList.filter(expr => (expr.isInstanceOf[Attribute]) ||
+                                                           (expr.isInstanceOf[Alias] &&
+                                                            expr.asInstanceOf[Alias].child
+                                                              .isInstanceOf[Attribute]))
+          val aggOutputList = aggTransMap.values.flatMap(t => t._2)
+            .map { ref =>
+              AttributeReference(ref.name, ref.dataType)(
+                exprId = ref.exprId,
+                qualifier = Some(hFactName))
+            }
+          val hOutputList = attrOutputList ++ aggOutputList
+          val hSel = s
+            .copy(
+              outputList = hOutputList,
+              inputList = hInputList,
+              aliasMap = hAliasMap,
+              children = hFact :: dims)
+          val gOutputList = g.outputList.zipWithIndex
+            .map { case (expr, index) =>
+              if (aggTransMap.keySet.contains(index)) {
+                aggTransMap(index)
+                  ._1
+              } else {
+                expr
+              }
+            }
+
+          val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
+          val hFactOutputSet = hFact.outputSet
+          wip.transformExpressions {
+            case ref: Attribute if hFactOutputSet.contains(ref) =>
+              AttributeReference(ref.name, ref.dataType)(
+                exprId = ref.exprId,
+                qualifier = Some(hFactName))
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
new file mode 100644
index 0000000..b00ea28
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, PredicateHelper, _}
+import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Limit, LogicalPlan, Window}
+
+import org.apache.carbondata.mv.plans.{Pattern, _}
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util.{ExtractGroupByModule, ExtractSelectModule, ExtractSelectModuleForWindow, ExtractTableModule, ExtractUnionModule}
+
+object SimpleModularizer extends ModularPatterns {
+  def patterns: Seq[Pattern] = {
+    SelectModule ::
+    GroupByModule ::
+    UnionModule ::
+    DataSourceModule :: Nil
+  }
+
+  override protected def collectPlaceholders(plan: ModularPlan): Seq[(ModularPlan, LogicalPlan)] = {
+    plan.collect {
+      case placeholder@ModularizeLater(logicalPlan) => placeholder -> logicalPlan
+    }
+  }
+
+  override protected def prunePlans(plans: Iterator[ModularPlan]): Iterator[ModularPlan] = {
+    plans
+    //    plans.filter(_.collect { case n if n.subqueries.nonEmpty => n }.isEmpty)
+    // TODO: find out why the following stmt not working
+    //    plans.filter(_.find { case n if n.subqueries.nonEmpty => true }.isEmpty)
+  }
+
+  override protected def makeupAliasMappings(
+      plans: Iterator[ModularPlan]): Iterator[ModularPlan] = {
+    def makeup(plan: ModularPlan): ModularPlan = {
+      plan transform {
+        case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasmap, _, children, _, _, _), _, _) =>
+          val aq = AttributeSet(g.outputList).filter(_.qualifier.nonEmpty)
+          val makeupmap = children.zipWithIndex.flatMap {
+            case (child, i) =>
+              aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+          }.toMap
+          g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap))
+      }
+    }
+
+    plans.map(makeup)
+  }
+}
+
+abstract class ModularPattern extends GenericPattern[ModularPlan] {
+
+  override protected def modularizeLater(plan: LogicalPlan): ModularPlan = ModularizeLater(plan)
+}
+
+case class ModularizeLater(plan: LogicalPlan) extends LeafNode {
+  override def output: Seq[Attribute] = plan.output
+}
+
+abstract class ModularPatterns extends Modularizer[ModularPlan] {
+
+  //  self: MQOContext#SparkyModeler =>
+
+  object SelectModule extends Pattern with PredicateHelper {
+
+    private[this] def makeSelectModule(
+        output: Seq[NamedExpression],
+        input: Seq[Expression],
+        predicate: Seq[Expression],
+        aliasmap: Map[Int, String],
+        joinedge: Seq[JoinEdge],
+        flags: FlagSet,
+        children: Seq[ModularPlan],
+        flagSpec: Seq[Seq[Any]],
+        windowSpec: Seq[Seq[Any]]) = {
+      Seq(Select(
+        output,
+        input,
+        predicate,
+        aliasmap,
+        joinedge,
+        children,
+        flags,
+        flagSpec,
+        windowSpec))
+    }
+
+    def apply(plan: LogicalPlan): Seq[ModularPlan] = {
+      plan match {
+        case Distinct(
+          ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children, flags1,
+          fspec1, wspec)) =>
+          val flags = flags1.setFlag(DISTINCT)
+          makeSelectModule(output, input, predicate, aliasmap, joinedge, flags,
+            children.map(modularizeLater), fspec1, wspec)
+
+        case Limit(
+          limitExpr,
+          Distinct(
+            ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children,
+              flags1, fspec1, wspec))) =>
+          val flags = flags1.setFlag(DISTINCT).setFlag(LIMIT)
+          makeSelectModule(output, input, predicate, aliasmap, joinedge, flags,
+            children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1, wspec)
+
+        case Limit(
+          limitExpr,
+          ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children, flags1,
+            fspec1, wspec)) =>
+          val flags = flags1.setFlag(LIMIT)
+          makeSelectModule(output, input, predicate, aliasmap, joinedge, flags,
+            children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1, wspec)
+
+        case ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children, flags1,
+        fspec1, wspec) =>
+          makeSelectModule(output, input, predicate, aliasmap, joinedge, flags1,
+            children.map(modularizeLater), fspec1, wspec)
+
+        case Window(exprs, _, _,
+          ExtractSelectModuleForWindow(output, input, predicate, aliasmap, joinedge, children,
+            flags1, fspec1, wspec)) =>
+          val sel1 = makeSelectModule(output, input, predicate, aliasmap, joinedge, flags1,
+            children.map(modularizeLater), fspec1, wspec)
+          makeSelectModule(
+            output.map(_.toAttribute),
+            output.map(_.toAttribute),
+            Seq.empty,
+            Map.empty,
+            Seq.empty,
+            NoFlags,
+            sel1,
+            Seq.empty,
+            Seq(Seq(exprs)) ++ wspec)
+
+        case _ => Nil
+      }
+    }
+  }
+
+  object GroupByModule extends Pattern with PredicateHelper {
+
+    private[this] def makeGroupByModule(
+        output: Seq[NamedExpression],
+        input: Seq[Expression],
+        predicate: Seq[Expression],
+        flags: FlagSet,
+        alias: Option[String],
+        child: ModularPlan,
+        fspec: Seq[Seq[Any]]) = {
+      val groupby = Some(GroupBy(output, input, predicate, alias, child, flags, fspec))
+      groupby.map(Seq(_)).getOrElse(Nil)
+    }
+
+    def apply(plan: LogicalPlan): Seq[ModularPlan] = {
+      plan match {
+        case Limit(
+        limitExpr,
+        ExtractGroupByModule(output, input, predicate, alias, child, flags1, fspec1)) =>
+          val flags = flags1.setFlag(LIMIT)
+          makeGroupByModule(
+            output,
+            input,
+            predicate,
+            flags,
+            alias,
+            modularizeLater(child),
+            Seq(Seq(limitExpr)) ++ fspec1)
+        case ExtractGroupByModule(output, input, predicate, alias, child, flags1, fspec1) =>
+          makeGroupByModule(output, input, predicate, flags1, alias, modularizeLater(child), fspec1)
+        case _ => Nil
+      }
+    }
+  }
+
+  object UnionModule extends Pattern with PredicateHelper {
+
+    private[this] def makeUnionModule(
+        flags: FlagSet,
+        children: Seq[ModularPlan],
+        fspec: Seq[Seq[Any]]) = {
+      Seq(modular.Union(children, flags, fspec))
+    }
+
+    def apply(plan: LogicalPlan): Seq[ModularPlan] = {
+      plan match {
+        case Distinct(ExtractUnionModule(children, flags1, fspec1)) =>
+          val flags = flags1.setFlag(DISTINCT)
+          makeUnionModule(flags, children.map(modularizeLater), fspec1)
+        case Limit(limitExpr, Distinct(ExtractUnionModule(children, flags1, fspec1))) =>
+          val flags = flags1.setFlag(DISTINCT).setFlag(LIMIT)
+          makeUnionModule(flags, children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1)
+        case Limit(limitExpr, ExtractUnionModule(children, flags1, fspec1)) =>
+          val flags = flags1.setFlag(LIMIT)
+          makeUnionModule(flags, children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1)
+        case ExtractUnionModule(children, flags1, fspec1) =>
+          makeUnionModule(flags1, children.map(modularizeLater), fspec1)
+        case _ => Nil
+      }
+    }
+  }
+
+  object DataSourceModule extends Pattern with Flags with PredicateHelper {
+
+    private[this] def makeDataSourceModule(
+        databaseName: String,
+        tableName: String,
+        output: Seq[NamedExpression],
+        flags: FlagSet,
+        fspec: Seq[Seq[Any]]) = {
+      Seq(ModularRelation(databaseName, tableName, output, flags, fspec))
+    }
+
+    def apply(plan: LogicalPlan): Seq[ModularPlan] = {
+      plan match {
+        case ExtractTableModule(databaseName, tableName, output, Nil, flags1, fspec1) =>
+          makeDataSourceModule(databaseName, tableName, output, flags1, fspec1)
+        case _ => Nil
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
new file mode 100644
index 0000000..b672a74
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import scala.collection._
+import scala.collection.mutable.{HashMap, MultiMap}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.util.{Printers, Signature, SQLBuilder}
+
+abstract class ModularPlan
+  extends QueryPlan[ModularPlan]
+    with AggregatePushDown
+    with Logging
+    with Serializable
+    with PredicateHelper with Printers {
+
+  /**
+   * the first two are to support sub-query expressions
+   */
+
+  lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved
+
+  def childrenResolved: Boolean = children.forall(_.resolved)
+
+  private var statsCache: Option[Statistics] = None
+
+  final def stats(spark: SparkSession, conf: SQLConf): Statistics = {
+    statsCache.getOrElse {
+      statsCache = Some(computeStats(spark, conf))
+      statsCache.get
+    }
+  }
+
+  final def invalidateStatsCache(): Unit = {
+    statsCache = None
+    children.foreach(_.invalidateStatsCache())
+  }
+
+  protected def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
+    //    spark.conf.set("spark.sql.cbo.enabled", true)
+    val sqlStmt = asOneLineSQL
+    val plan = spark.sql(sqlStmt).queryExecution.optimizedPlan
+    plan.stats(conf)
+  }
+
+  override def fastEquals(other: TreeNode[_]): Boolean = {
+    this.eq(other)
+  }
+
+  private var _rewritten: Boolean = false
+
+  /**
+   * Marks this plan as already rewritten.
+   */
+  private[mv] def setRewritten(): ModularPlan = {
+    _rewritten = true
+    children.foreach(_.setRewritten())
+    this
+  }
+
+  /**
+   * Returns true if this node and its children have already been gone through query rewrite.
+   * Note this this is only an optimization used to avoid rewriting trees that have already
+   * been rewritten, and can be reset by transformations.
+   */
+  def rewritten: Boolean = {
+    _rewritten
+  }
+
+  private var _skip: Boolean = false
+
+  private[mv] def setSkip(): ModularPlan = {
+    _skip = true
+    children.foreach(_.setSkip())
+    this
+  }
+
+  private[mv] def resetSkip(): ModularPlan = {
+    _skip = false
+    children.foreach(_.resetSkip())
+    this
+  }
+
+  def skip: Boolean = _skip
+
+  def isSPJGH: Boolean = {
+    this match {
+      case modular.Select(_, _, _, _, _,
+        Seq(modular.GroupBy(_, _, _, _, sel_c2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+        _, _, _) if sel_c2.children.forall(_.isInstanceOf[modular.LeafNode]) => true
+
+      case modular.GroupBy(_, _, _, _, sel_c2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)
+        if sel_c2.children.forall(_.isInstanceOf[modular.LeafNode]) => true
+
+      case modular.Select(_, _, _, _, _, children, _, _, _)
+        if children.forall(_.isInstanceOf[modular.LeafNode]) => true
+
+      case _ => false
+    }
+  }
+
+  def signature: Option[Signature] = ModularPlanSignatureGenerator.generate(this)
+
+  def createMutableAdjacencyList(
+      edges: Seq[JoinEdge]
+  ): mutable.HashMap[Int, mutable.Set[(Int, JoinType)]] with mutable.MultiMap[Int, (Int, JoinType)]
+  = {
+    val mm = new HashMap[Int, mutable.Set[(Int, JoinType)]] with MultiMap[Int, (Int, JoinType)]
+    for (edge <- edges) { mm.addBinding(edge.left, (edge.right, edge.joinType)) }
+    mm
+  }
+
+  def createImmutableAdjacencyList(edges: Seq[JoinEdge]): Predef.Map[Int, Seq[(Int, JoinType)]] = {
+    edges.groupBy { _.left }.map { case (k, v) => (k, v.map(e => (e.right, e.joinType))) }
+  }
+
+  def adjacencyList: Map[Int, Seq[(Int, JoinType)]] = Map.empty
+
+  def extractJoinConditions(left: ModularPlan, right: ModularPlan): Seq[Expression] = Seq.empty
+
+  def extractRightEvaluableConditions(left: ModularPlan, right: ModularPlan): Seq[Expression] =
+    Seq.empty
+
+  def extractEvaluableConditions(plan: ModularPlan): Seq[Expression] = Seq.empty
+
+  def asCompactSQL: String = asCompactString(new SQLBuilder(this).fragmentExtract)
+
+  def asOneLineSQL: String = asOneLineString(new SQLBuilder(this).fragmentExtract)
+
+  // for plan without sub-query expression only
+  def asOneLineSQL(subqueryPrefix: String): String = {
+    asOneLineString(new SQLBuilder(
+      this,
+      subqueryPrefix).fragmentExtract)
+  }
+
+  /**
+   * Returns a plan where a best effort attempt has been made to transform `this` in a way
+   * that preserves the result but replaces harmonized dimension table with HarmonizedRelation
+   * and fact table with sub-plan that pre-aggregates the table before join with dimension table
+   *
+   * Some nodes should overwrite this to provide proper harmonization logic.
+   */
+  lazy val harmonized: ModularPlan = DefaultHarmonizer.execute(preHarmonized)
+
+  /**
+   * Do some simple transformation on this plan before harmonizing. Implementations can override
+   * this method to provide customized harmonize logic without rewriting the whole logic.
+   *
+   * We assume queries need to be harmonized are of the form:
+   *
+   * FACT (left) join (harmonized) DIM1 (left) join (harmonized) DIM2 ...
+   *
+   * For queries of not this form, customize this method for them to conform this form.
+   */
+  protected def preHarmonized: ModularPlan = {
+    this
+  }
+}
+
+object ModularPlan extends PredicateHelper {
+
+}
+
+abstract class LeafNode extends ModularPlan {
+  override def children: Seq[ModularPlan] = Nil
+}
+
+abstract class UnaryNode extends ModularPlan {
+  def child: ModularPlan
+
+  override def children: Seq[ModularPlan] = child :: Nil
+}
+
+abstract class BinaryNode extends ModularPlan {
+  def left: ModularPlan
+
+  def right: ModularPlan
+
+  override def children: Seq[ModularPlan] = Seq(left, right)
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlanSignatureGenerator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlanSignatureGenerator.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlanSignatureGenerator.scala
new file mode 100644
index 0000000..0e292ec
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlanSignatureGenerator.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.util.{Signature, SignatureGenerator, SignatureRule}
+
+object ModularPlanSignatureGenerator extends SignatureGenerator[ModularPlan] {
+  lazy val rule: SignatureRule[ModularPlan] = ModularPlanRule
+
+  override def generate(plan: ModularPlan): Option[Signature] = {
+    if (plan.isSPJGH) {
+      super.generate(plan)
+    } else {
+      None
+    }
+  }
+}
+
+object ModularPlanRule extends SignatureRule[ModularPlan] {
+
+  def apply(plan: ModularPlan, childSignatures: Seq[Option[Signature]]): Option[Signature] = {
+
+    plan match {
+      case modular.Select(_, _, _, _, _, _, _, _, _) =>
+        if (childSignatures.map { _.getOrElse(Signature()).groupby }.forall(x => !x)) {
+          Some(Signature(
+            groupby = false,
+            childSignatures.flatMap { _.getOrElse(Signature()).datasets.toSeq }.toSet))
+        } else if (childSignatures.length == 1 &&
+                   childSignatures(0).getOrElse(Signature()).groupby) {
+          childSignatures(0)
+        } else {
+          None
+        }
+      case modular.GroupBy(_, _, _, _, _, _, _) =>
+        if (childSignatures.length == 1 && !childSignatures(0).getOrElse(Signature()).groupby) {
+          Some(Signature(groupby = true, childSignatures(0).getOrElse(Signature()).datasets))
+        } else {
+          None
+        }
+      case HarmonizedRelation(source) =>
+        source.signature match {
+          case Some(s) =>
+            Some(Signature(groupby = false, s.datasets))
+          case _ =>
+            None
+        }
+      case modular.ModularRelation(dbname, tblname, _, _, _) =>
+        if (dbname != null && tblname != null) {
+          Some(Signature(groupby = false, Set(Seq(dbname, tblname).mkString("."))))
+        } else {
+          Some(Signature(groupby = false, Set(plan.toString())))
+        }
+      case _ => None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
new file mode 100644
index 0000000..6b71e38
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.carbondata.mv.plans.modular.Flags._
+
+object ModularRelation {
+  def apply(outputList: NamedExpression*): ModularRelation = {
+    new ModularRelation(
+      "test",
+      "test",
+      outputList,
+      NoFlags,
+      Seq.empty)
+  }
+}
+
+case class ModularRelation(databaseName: String,
+    tableName: String,
+    outputList: Seq[NamedExpression],
+    flags: FlagSet,
+    rest: Seq[Seq[Any]]) extends LeafNode {
+  override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
+    val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan
+    val stats = plan.stats(conf)
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq(0)
+    val attributeStats = AttributeMap(stats.attributeStats.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
+  }
+
+  override def output: Seq[Attribute] = outputList.map(_.toAttribute)
+
+  override def adjacencyList: Map[Int, Seq[(Int, JoinType)]] = Map.empty
+
+  override def equals(that: Any): Boolean = {
+    that match {
+      case that: ModularRelation =>
+        if ((databaseName != null && tableName != null && databaseName == that.databaseName &&
+             tableName == that.tableName) ||
+            (databaseName == null && tableName == null && that.databaseName == null &&
+             that.tableName == null && output.toString == (that.output).toString)) {
+          true
+        } else {
+          false
+        }
+      case _ => false
+    }
+  }
+}
+
+object HarmonizedRelation {
+  def canHarmonize(source: ModularPlan): Boolean = {
+    source match {
+      case g@GroupBy(
+      _,
+      _,
+      _,
+      _,
+      Select(_, _, _, _, _, dim :: Nil, NoFlags, Nil, Nil),
+      NoFlags,
+      Nil) if (dim.isInstanceOf[ModularRelation]) =>
+        if (g.outputList
+          .forall(col => col.isInstanceOf[AttributeReference] ||
+                         (col.isInstanceOf[Alias] &&
+                          col.asInstanceOf[Alias].child.isInstanceOf[AttributeReference]))) {
+          true
+        } else {
+          false
+        }
+      case _ => false
+    }
+  }
+}
+
+// support harmonization for dimension table
+case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
+  require(HarmonizedRelation.canHarmonize(source), "invalid plan for harmonized relation")
+  lazy val tableName = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
+    .tableName
+  lazy val databaseName = source.asInstanceOf[GroupBy].child.children(0)
+    .asInstanceOf[ModularRelation].databaseName
+
+  //  override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = source.stats
+  // (spark, conf)
+  override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
+    val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan
+    val stats = plan.stats(conf)
+    val output = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
+      .outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq(0)
+    val aliasMap = AttributeMap(
+      source.asInstanceOf[GroupBy].outputList.collect {
+        case a: Alias if (a.child.isInstanceOf[Attribute]) => (a.child.asInstanceOf[Attribute], a
+          .toAttribute)
+      })
+    val aStatsIterator = stats.attributeStats.iterator.map { pair => (rewrites(pair._1), pair._2) }
+    val attributeStats = AttributeMap(aStatsIterator
+      .map(pair => ((aliasMap.get(pair._1)).getOrElse(pair._1), pair._2)).toSeq)
+
+    Statistics(stats.sizeInBytes, None, attributeStats, stats.hints)
+  }
+
+  override def output: Seq[Attribute] = source.output
+
+  // two harmonized modular relations are equal only if orders of output columns of
+  // their source plans are exactly the same
+  override def equals(that: Any): Boolean = {
+    that match {
+      case that: HarmonizedRelation => source.sameResult(that.source)
+      case _ => false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
new file mode 100644
index 0000000..d255359
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Exists, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+import org.apache.carbondata.mv.expressions.modular._
+import org.apache.carbondata.mv.plans._
+
+abstract class GenericPattern[TreeType <: TreeNode[TreeType]] extends Logging {
+  protected def modularizeLater(plan: LogicalPlan): TreeType
+
+  def apply(plan: LogicalPlan): Seq[TreeType]
+}
+
+abstract class Modularizer[TreeType <: TreeNode[TreeType]] {
+  def patterns: Seq[GenericPattern[TreeType]]
+
+  // protected def modularizeLater(plan: LogicalPlan) = this.modularize(plan).next()
+
+  def modularize(plan: LogicalPlan): Iterator[TreeType] = {
+    val replaced = plan.transformAllExpressions {
+      case s: ScalarSubquery =>
+        if (s.children.isEmpty) {
+          ScalarModularSubquery(
+            modularize(s.plan).next.asInstanceOf[ModularPlan],
+            s.children,
+            s.exprId)
+        } else {
+          throw new UnsupportedOperationException(s"Expression $s doesn't canonicalized")
+        }
+      case l: ListQuery =>
+        if (l.children.isEmpty) {
+          ModularListQuery(modularize(l.plan).next.asInstanceOf[ModularPlan], l.children, l.exprId)
+        } else {
+          throw new UnsupportedOperationException(s"Expression $l doesn't canonicalized")
+        }
+      case e: Exists =>
+        if (e.children.isEmpty) {
+          ModularExists(modularize(e.plan).next.asInstanceOf[ModularPlan], e.children, e.exprId)
+        } else {
+          throw new UnsupportedOperationException(s"Expression $e doesn't canonicalized")
+        }
+      case o => o
+    }
+    //    val replaced = plan
+    val mplans = modularizeCore(replaced)
+    makeupAliasMappings(mplans)
+  }
+
+  private def modularizeCore(plan: LogicalPlan): Iterator[TreeType] = {
+    // Collect modular plan candidates.
+    val candidates = patterns.iterator.flatMap(_ (plan))
+
+    // The candidates may contain placeholders marked as [[modularizeLater]],
+    // so try to replace them by their child plans.
+    val plans = candidates.flatMap { candidate =>
+      val placeholders = collectPlaceholders(candidate)
+
+      if (placeholders.isEmpty) {
+        // Take the candidate as is because it does not contain placeholders.
+        Iterator(candidate)
+      } else {
+        // Plan the logical plan marked as [[modularizeLater]] and replace the placeholders.
+        placeholders.iterator.foldLeft(Iterator(candidate)) {
+          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
+            // Modularize the logical plan for the placeholder
+            val childPlans = this.modularizeCore(logicalPlan)
+
+            candidatesWithPlaceholders.flatMap { candidateWithPlaceholder =>
+              childPlans.map { childPlan =>
+                // Replace the placeholder by the child plan
+                candidateWithPlaceholder.transformUp {
+                  case p if p == placeholder => childPlan
+                }
+              }
+            }
+        }
+      }
+    }
+
+    val pruned = prunePlans(plans)
+    // val iter = patterns.view.flatMap(_(plan)).toIterator
+    supports(
+      pruned.hasNext,
+      s"Modular plan not supported (e.g. has subquery expression) for \n$plan")
+    //    makeupAliasMappings(pruned)
+    pruned
+  }
+
+  /** Collects placeholders marked as [[modularizeLater]] by pattern and its [[LogicalPlan]]s */
+  protected def collectPlaceholders(plan: TreeType): Seq[(TreeType, LogicalPlan)]
+
+  /** Prunes bad plans to prevent combinatorial explosion. */
+  protected def prunePlans(plans: Iterator[TreeType]): Iterator[TreeType]
+
+  protected def makeupAliasMappings(plans: Iterator[TreeType]): Iterator[TreeType]
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
new file mode 100644
index 0000000..b73411e
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
+import org.apache.spark.sql.catalyst.plans.JoinType
+
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.modular.Flags._
+
+private[mv] trait Matchable extends ModularPlan {
+  def outputList: Seq[NamedExpression]
+
+  def predicateList: Seq[Expression]
+}
+
+case class GroupBy(
+    outputList: Seq[NamedExpression],
+    inputList: Seq[Expression],
+    predicateList: Seq[Expression],
+    alias: Option[String],
+    child: ModularPlan,
+    flags: FlagSet,
+    flagSpec: Seq[Seq[Any]]) extends UnaryNode with Matchable {
+  override def output: Seq[Attribute] = outputList.map(_.toAttribute)
+}
+
+case class Select(
+    outputList: Seq[NamedExpression],
+    inputList: Seq[Expression],
+    predicateList: Seq[Expression],
+    aliasMap: Map[Int, String],
+    joinEdges: Seq[JoinEdge],
+    children: Seq[ModularPlan],
+    flags: FlagSet,
+    flagSpec: Seq[Seq[Any]],
+    windowSpec: Seq[Seq[Any]]) extends ModularPlan with Matchable {
+  override def output: Seq[Attribute] = outputList.map(_.toAttribute)
+
+  override def adjacencyList: scala.collection.immutable.Map[Int, Seq[(Int, JoinType)]] = {
+    joinEdges.groupBy { _.left }.map { case (k, v) => (k, v.map(e => (e.right, e.joinType))) }
+  }
+
+  override def extractJoinConditions(
+      left: ModularPlan, right: ModularPlan): Seq[Expression] = {
+    predicateList.filter(p => p.references.intersect(left.outputSet).nonEmpty &&
+                              p.references.intersect(right.outputSet).nonEmpty &&
+                              p.references.subsetOf(left.outputSet ++ right.outputSet))
+  }
+
+  override def extractRightEvaluableConditions(
+      left: ModularPlan, right: ModularPlan): Seq[Expression] = {
+    predicateList.filter(p => p.references.subsetOf(left.outputSet ++ right.outputSet) &&
+                              p.references.intersect(right.outputSet).nonEmpty)
+  }
+
+  override def extractEvaluableConditions(plan: ModularPlan): Seq[Expression] = {
+    predicateList.filter(p => canEvaluate(p, plan))
+  }
+}
+
+case class Union(children: Seq[ModularPlan], flags: FlagSet, flagSpec: Seq[Seq[Any]])
+  extends ModularPlan {
+  override def output: Seq[Attribute] = children.head.output
+}
+
+case object OneRowTable extends LeafNode {
+  override def output: Seq[Attribute] = Nil
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/queryGraph.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/queryGraph.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/queryGraph.scala
new file mode 100644
index 0000000..da6bc15
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/queryGraph.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.plans.JoinType
+
+@DeveloperApi
+case class JoinEdge(left: Int, right: Int, joinType: JoinType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
new file mode 100644
index 0000000..8c799fe
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+import org.apache.carbondata.mv.plans.util.{CheckSPJG, LogicalPlanSignatureGenerator, Signature}
+
+/**
+ * A a collection of common abstractions for query plans as well as
+ * a base semantic plan representation.
+ */
+package object plans {
+  @DeveloperApi
+  type Pattern = org.apache.carbondata.mv.plans.modular.ModularPattern
+
+  implicit class LogicalPlanUtils(val plan: LogicalPlan) {
+    lazy val isSPJG: Boolean = CheckSPJG.isSPJG(plan)
+    lazy val signature: Option[Signature] = LogicalPlanSignatureGenerator.generate(plan)
+  }
+
+  implicit class MorePredicateHelper(p: PredicateHelper) {
+    def canEvaluate(expr: Expression, plan: ModularPlan): Boolean = {
+      expr.references.subsetOf(plan.outputSet)
+    }
+
+    def canEvaluate(expr: Expression, exprList: Seq[Expression]): Boolean = {
+      expr.references.subsetOf(AttributeSet(exprList))
+    }
+  }
+
+  def supports(supported: Boolean, message: Any) {
+    if (!supported) {
+      throw new UnsupportedOperationException(s"unsupported operation: $message")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
new file mode 100644
index 0000000..6363089
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _}
+import org.apache.spark.sql.internal.SQLConf
+
+object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
+
+  val conf = new SQLConf()
+    .copy(SQLConf.CASE_SENSITIVE -> true, SQLConf.STARSCHEMA_DETECTION -> true)
+  protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
+
+  def batches: Seq[Batch] = {
+    // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
+    // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
+    // However, because we also use the analyzer to canonicalized queries (for view definition),
+    // we do not eliminate subqueries or compute current time in the analyzer.
+    Batch(
+      "Finish Analysis", Once,
+      EliminateSubqueryAliases,
+      EliminateView,
+      ReplaceExpressions,
+      ComputeCurrentTime,
+      //      GetCurrentDatabase(sessionCatalog),
+      RewriteDistinctAggregates,
+      ReplaceDeduplicateWithAggregate) ::
+    //////////////////////////////////////////////////////////////////////////////////////////
+    // Optimizer rules start here
+    //////////////////////////////////////////////////////////////////////////////////////////
+    // - Do the first call of CombineUnions before starting the major Optimizer rules,
+    //   since it can reduce the number of iteration and the other rules could add/move
+    //   extra operators between two adjacent Union operators.
+    // - Call CombineUnions again in Batch("Operator Optimizations"),
+    //   since the other rules might make two separate Unions operators adjacent.
+    Batch(
+      "Union", Once,
+      CombineUnions) ::
+    Batch(
+      "Pullup Correlated Expressions", Once,
+      PullupCorrelatedPredicates) ::
+    Batch(
+      "Subquery", Once,
+      OptimizeSubqueries) ::
+    Batch(
+      "Replace Operators", fixedPoint,
+      ReplaceIntersectWithSemiJoin,
+      ReplaceExceptWithAntiJoin,
+      ReplaceDistinctWithAggregate) ::
+    Batch(
+      "Aggregate", fixedPoint,
+      RemoveLiteralFromGroupExpressions,
+      RemoveRepetitionFromGroupExpressions) ::
+    Batch(
+      "Operator Optimizations", fixedPoint, Seq(
+        // Operator push down
+        PushProjectionThroughUnion,
+        ReorderJoin(conf),
+        EliminateOuterJoin(conf),
+        PushPredicateThroughJoin,
+        PushDownPredicate,
+        //      LimitPushDown(conf),
+        ColumnPruning,
+        //      InferFiltersFromConstraints(conf),
+        // Operator combine
+        CollapseRepartition,
+        CollapseProject,
+        CollapseWindow,
+        CombineFilters,
+        CombineLimits,
+        CombineUnions,
+        // Constant folding and strength reduction
+        NullPropagation(conf),
+        FoldablePropagation,
+        //      OptimizeIn(conf),
+        ConstantFolding,
+        ReorderAssociativeOperator,
+        LikeSimplification,
+        BooleanSimplification,
+        SimplifyConditionals,
+        RemoveDispensableExpressions,
+        SimplifyBinaryComparison,
+        //      PruneFilters(conf),
+        EliminateSorts,
+        SimplifyCasts,
+        SimplifyCaseConversionExpressions,
+        RewriteCorrelatedScalarSubquery,
+        EliminateSerialization,
+        RemoveRedundantAliases,
+        RemoveRedundantProject,
+        SimplifyCreateStructOps,
+        SimplifyCreateArrayOps,
+        SimplifyCreateMapOps) ++
+                                            extendedOperatorOptimizationRules: _*) ::
+    Batch(
+      "Check Cartesian Products", Once,
+      CheckCartesianProducts(conf)) ::
+    //    Batch("Join Reorder", Once,
+    //      CostBasedJoinReorder(conf)) ::
+    //    Batch("Decimal Optimizations", fixedPoint,
+    //      DecimalAggregates(conf)) ::
+    Batch(
+      "Object Expressions Optimization", fixedPoint,
+      EliminateMapObjects,
+      CombineTypedFilters) ::
+    //    Batch("LocalRelation", fixedPoint,
+    //      ConvertToLocalRelation,
+    //      PropagateEmptyRelation) ::
+    Batch(
+      "OptimizeCodegen", Once,
+      OptimizeCodegen(conf)) ::
+    Batch(
+      "RewriteSubquery", Once,
+      RewritePredicateSubquery,
+      CollapseProject) :: Nil
+  }
+
+  /**
+   * Optimize all the subqueries inside expression.
+   */
+  object OptimizeSubqueries extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = {
+      plan transformAllExpressions {
+        case s: SubqueryExpression =>
+          val Subquery(newPlan) = BirdcageOptimizer.this.execute(Subquery(s.plan))
+          s.withNewPlan(newPlan)
+      }
+    }
+  }
+
+  /**
+   * Override to provide additional rules for the operator optimization batch.
+   */
+  def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = {
+    Nil
+  }
+}
+
+/**
+ * Push Aggregate through join to fact table.
+ * Pushes down [[Aggregate]] operators where the `grouping` and `aggregate` expressions can
+ * be evaluated using only the attributes of the fact table, the left or right side of a
+ * star-join.
+ * Other [[Aggregate]] expressions stay in the original [[Aggregate]].
+ *
+ * Check 'Aggregate Pushdown Over Join: Design & Preliminary Results' by LiTao for more details
+ */
+// case class PushAggregateThroughJoin(conf: SQLConf) extends Rule[LogicalPlan] with
+// PredicateHelper {
+//
+//  val tableCluster = {
+//    val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+//    val tableClusterString = conf.getConfString("spark.mv.tableCluster")
+//    mapper.readValue(tableClusterString, classOf[TableCluster])
+//  }
+//
+//  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+//
+//    // Push down aggregate expressions through Join
+//    case a @ Aggregate(grouping, aggregate, Project(projectList, Join(left, right, jt, cond)))
+//        if (left.isInstanceOf[LeafNode] && => {
+//      val fTables: Set[String] = tableCluster.getFact
+//      val dTables: Set[String] = tableCluster.getDimension
+// //      if canPushThrough(left,a)
+//
+//      if (fTables.contains(s"${left.databaseName}.${left.tableName}")
+//          Aggregate(newGrouping, newAggregate, Project(projectList, Join(Aggregate(_,_,Project
+// (projectList1, left)), right, jt, cond)))
+//      }
+//    }
+//
+//  private def canPushThrough(join: Join): Boolean = join match {
+//    case Join(left : LeafNode, right: LeafNode, Inner, EqualTo(l: AttributeReference,
+// r: AttributeReference)) => true
+//
+//
+//  }
+//
+//
+// }


Mime
View raw message