carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [44/50] [abbrv] carbondata git commit: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Expression support inside aggregate function for Query
Date Tue, 09 Jan 2018 04:02:12 GMT
[CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Expression support inside aggregate function
for Query

Support transforming of query plan for aggregate table when query aggregate function contains
any expression
Support sub query in Preaggregate table

This closes #1728


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c70e73f1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c70e73f1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c70e73f1

Branch: refs/heads/branch-1.3
Commit: c70e73f11b281cde9e8310d5a2d5640c75c30665
Parents: af4277e
Author: kumarvishal <kumarvishal.1802@gmail.com>
Authored: Mon Dec 25 15:04:39 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Mon Jan 8 11:52:43 2018 +0530

----------------------------------------------------------------------
 .../schema/table/AggregationDataMapSchema.java  |   51 +-
 .../core/preagg/AggregateQueryPlan.java         |   48 +
 .../core/preagg/AggregateTableSelector.java     |   33 +-
 .../carbondata/core/preagg/QueryColumn.java     |   27 +-
 .../carbondata/core/preagg/QueryPlan.java       |   59 -
 .../TestPreAggregateExpressions.scala           |   68 +
 .../TestPreAggregateTableSelection.scala        |    5 +
 .../TestPreAggregateWithSubQuery.scala          |   88 +
 .../preaaggregate/PreAggregateUtil.scala        |  203 ++-
 .../sql/hive/CarbonPreAggregateRules.scala      | 1633 ++++++++----------
 .../src/main/spark2.1/CarbonSessionState.scala  |    2 +-
 .../src/main/spark2.2/CarbonSessionState.scala  |    2 +-
 12 files changed, 1101 insertions(+), 1118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index 4b2d492..e061812 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -62,6 +62,8 @@ public class AggregationDataMapSchema extends DataMapSchema {
    */
   private int ordinal = Integer.MAX_VALUE;
 
+  private Set aggExpToColumnMapping;
+
   public AggregationDataMapSchema(String dataMapName, String className) {
     super(dataMapName, className);
   }
@@ -203,23 +205,6 @@ public class AggregationDataMapSchema extends DataMapSchema {
   }
 
   /**
-   * Below method is to check if parent column with matching aggregate function
-   * @param parentColumnName
-   *                    parent column name
-   * @param aggFunction
-   *                    aggregate function
-   * @return is matching
-   */
-  public boolean isColumnWithAggFunctionExists(String parentColumnName, String aggFunction)
{
-    Set<String> aggFunctions = parentColumnToAggregationsMapping.get(parentColumnName);
-    if (null != aggFunctions && aggFunctions.contains(aggFunction)) {
-      return true;
-    }
-    return false;
-  }
-
-
-  /**
    * Method to prepare mapping of parent to list of aggregation function applied on that
column
    * @param listOfColumns
    *        child column schema list
@@ -341,4 +326,36 @@ public class AggregationDataMapSchema extends DataMapSchema {
   public int getOrdinal() {
     return ordinal;
   }
+
+  /**
+   * Below method will be used to get the aggregation column based on index
+   * It will return the first aggregation column found based on index
+   * @param searchStartIndex
+   *  start index
+   * @param sortedColumnSchema
+   * list of sorted table columns
+   * @return found column list
+   *
+   */
+  public ColumnSchema getAggColumnBasedOnIndex(int searchStartIndex,
+      List<ColumnSchema> sortedColumnSchema) {
+    ColumnSchema columnSchema = null;
+    for (int i = searchStartIndex; i < sortedColumnSchema.size(); i++) {
+      if (!sortedColumnSchema.get(i).getAggFunction().isEmpty()) {
+        columnSchema = sortedColumnSchema.get(i);
+        break;
+      }
+    }
+    return columnSchema;
+  }
+
+  public synchronized Set getAggExpToColumnMapping() {
+    return aggExpToColumnMapping;
+  }
+
+  public synchronized void setAggExpToColumnMapping(Set aggExpToColumnMapping) {
+    if (null == this.aggExpToColumnMapping) {
+      this.aggExpToColumnMapping = aggExpToColumnMapping;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/preagg/AggregateQueryPlan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateQueryPlan.java
b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateQueryPlan.java
new file mode 100644
index 0000000..e895bde
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateQueryPlan.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.List;
+
+/**
+ * class to maintain the query plan to select the data map tables
+ */
+public class AggregateQueryPlan {
+
+  /**
+   * List of projection columns
+   */
+  private List<QueryColumn> projectionColumn;
+
+  /**
+   * list of filter columns
+   */
+  private List<QueryColumn> filterColumns;
+
+  public AggregateQueryPlan(List<QueryColumn> projectionColumn, List<QueryColumn>
filterColumns) {
+    this.projectionColumn = projectionColumn;
+    this.filterColumns = filterColumns;
+  }
+
+  public List<QueryColumn> getProjectionColumn() {
+    return projectionColumn;
+  }
+
+  public List<QueryColumn> getFilterColumns() {
+    return filterColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
index 5347567..79d0904 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
@@ -36,15 +36,15 @@ public class AggregateTableSelector {
   /**
    * current query plan
    */
-  private QueryPlan queryPlan;
+  private AggregateQueryPlan aggregateQueryPlan;
 
   /**
    * parent table
    */
   private CarbonTable parentTable;
 
-  public AggregateTableSelector(QueryPlan queryPlan, CarbonTable parentTable) {
-    this.queryPlan = queryPlan;
+  public AggregateTableSelector(AggregateQueryPlan aggregateQueryPlan, CarbonTable parentTable)
{
+    this.aggregateQueryPlan = aggregateQueryPlan;
     this.parentTable = parentTable;
   }
 
@@ -58,9 +58,8 @@ public class AggregateTableSelector {
    * @return selected pre aggregate table schema
    */
   public List<DataMapSchema> selectPreAggDataMapSchema() {
-    List<QueryColumn> projectionColumn = queryPlan.getProjectionColumn();
-    List<QueryColumn> aggColumns = queryPlan.getAggregationColumns();
-    List<QueryColumn> filterColumns = queryPlan.getFilterColumns();
+    List<QueryColumn> projectionColumn = aggregateQueryPlan.getProjectionColumn();
+    List<QueryColumn> filterColumns = aggregateQueryPlan.getFilterColumns();
     List<DataMapSchema> dataMapSchemaList = parentTable.getTableInfo().getDataMapSchemaList();
     List<DataMapSchema> selectedDataMapSchema = new ArrayList<>();
     boolean isMatch;
@@ -74,6 +73,7 @@ public class AggregateTableSelector {
               getColumnSchema(queryColumn, aggregationDataMapSchema);
           if (null == columnSchemaByParentName) {
             isMatch = false;
+            break;
           }
         }
         if (isMatch) {
@@ -99,6 +99,7 @@ public class AggregateTableSelector {
               getColumnSchema(queryColumn, aggregationDataMapSchema);
           if (null == columnSchemaByParentName) {
             isMatch = false;
+            break;
           }
         }
         if (isMatch) {
@@ -110,26 +111,6 @@ public class AggregateTableSelector {
         return selectedDataMapSchema;
       }
     }
-    // match aggregation columns
-    if (null != aggColumns && !aggColumns.isEmpty()) {
-      List<DataMapSchema> dmSchemaToIterate =
-          selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
-      selectedDataMapSchema = new ArrayList<>();
-      for (DataMapSchema dmSchema : dmSchemaToIterate) {
-        isMatch = true;
-        for (QueryColumn queryColumn : aggColumns) {
-          AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema)
dmSchema;
-          if (!aggregationDataMapSchema
-              .isColumnWithAggFunctionExists(queryColumn.getColumnSchema().getColumnName(),
-                  queryColumn.getAggFunction())) {
-            isMatch = false;
-          }
-        }
-        if (isMatch) {
-          selectedDataMapSchema.add(dmSchema);
-        }
-      }
-    }
     return selectedDataMapSchema;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
index c91a703..42daa6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
@@ -30,16 +30,6 @@ public class QueryColumn {
   private ColumnSchema columnSchema;
 
   /**
-   * to store the change data type in case of cast
-   */
-  private String changedDataType;
-
-  /**
-   * aggregation function applied
-   */
-  private String aggFunction;
-
-  /**
    * is filter column
    */
   private boolean isFilterColumn;
@@ -49,11 +39,8 @@ public class QueryColumn {
    */
   private String timeseriesFunction;
 
-  public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction,
-      boolean isFilterColumn, String timeseriesFunction) {
+  public QueryColumn(ColumnSchema columnSchema, boolean isFilterColumn, String timeseriesFunction)
{
     this.columnSchema = columnSchema;
-    this.changedDataType = changedDataType;
-    this.aggFunction = aggFunction;
     this.isFilterColumn = isFilterColumn;
     this.timeseriesFunction = timeseriesFunction;
   }
@@ -62,14 +49,6 @@ public class QueryColumn {
     return columnSchema;
   }
 
-  public String getChangedDataType() {
-    return changedDataType;
-  }
-
-  public String getAggFunction() {
-    return aggFunction;
-  }
-
   public boolean isFilterColumn() {
     return isFilterColumn;
   }
@@ -92,9 +71,6 @@ public class QueryColumn {
     if (!columnSchema.equals(that.columnSchema)) {
       return false;
     }
-    if (!(aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction ==
null)) {
-      return false;
-    }
     return timeseriesFunction != null ?
         timeseriesFunction.equals(that.timeseriesFunction) :
         that.timeseriesFunction == null;
@@ -102,7 +78,6 @@ public class QueryColumn {
 
   @Override public int hashCode() {
     int result = columnSchema.hashCode();
-    result = 31 * result + (aggFunction != null ? aggFunction.hashCode() : 0);
     result = 31 * result + (timeseriesFunction != null ? timeseriesFunction.hashCode() :
0);
     result = 31 * result + (isFilterColumn ? 1 : 0);
     return result;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
deleted file mode 100644
index 21a34fa..0000000
--- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.preagg;
-
-import java.util.List;
-
-/**
- * class to maintain the query plan to select the data map tables
- */
-public class QueryPlan {
-
-  /**
-   * List of projection columns
-   */
-  private List<QueryColumn> projectionColumn;
-
-  /**
-   * list of aggregation columns
-   */
-  private List<QueryColumn> aggregationColumns;
-
-  /**
-   * list of filter columns
-   */
-  private List<QueryColumn> filterColumns;
-
-  public QueryPlan(List<QueryColumn> projectionColumn, List<QueryColumn> aggregationColumns,
-      List<QueryColumn> filterColumns) {
-    this.projectionColumn = projectionColumn;
-    this.aggregationColumns = aggregationColumns;
-    this.filterColumns = filterColumns;
-  }
-
-  public List<QueryColumn> getProjectionColumn() {
-    return projectionColumn;
-  }
-
-  public List<QueryColumn> getAggregationColumns() {
-    return aggregationColumns;
-  }
-
-  public List<QueryColumn> getFilterColumns() {
-    return filterColumns;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
index 4171690..0b22c56 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
@@ -17,6 +17,10 @@
 
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -25,6 +29,7 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll
{
   override def beforeAll: Unit = {
     sql("DROP TABLE IF EXISTS mainTable")
     sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
   }
   test("test pre agg create table with expression 1") {
     sql(
@@ -94,6 +99,69 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll
{
          | """.stripMargin)
     checkExistence(sql("DESCRIBE FORMATTED mainTable_agg5"), true, "maintable_column_0_count")
   }
+  test("test pre agg table selection with expression 1") {
+    val df = sql("select name as NewName, count(age) as sum from mainTable group by name
order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+
+  test("test pre agg table selection with expression 2") {
+    val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum
from mainTable group by name order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test pre agg table selection with expression 3") {
+    val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
+    checkAnswer(df, Seq(Row(6.0)))
+  }
+
+  test("test pre agg table selection with expression 4") {
+    val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+    checkAnswer(df, Seq(Row(2.0)))
+  }
+
+  test("test pre agg table selection with expression 5") {
+    val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then
id else 0 end) from maintable")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
+    checkAnswer(df, Seq(Row(2.0,6.0)))
+  }
+
+  /**
+   * Below method will be used to validate the table name is present in the plan or not
+   * @param plan
+   * query plan
+   * @param actualTableName
+   * table name to be validated
+   */
+  def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
+    var isValidPlan = false
+    plan.transform {
+      // first check if any preaTable1 scala function is applied it is present is in plan
+      // then call is from create preaTable1regate table class so no need to transform the
query plan
+      case ca:CarbonRelation =>
+        if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+          val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation]
+          if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
+            isValidPlan = true
+          }
+        }
+        ca
+      case logicalRelation:LogicalRelation =>
+        if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+          val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+          if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
+            isValidPlan = true
+          }
+        }
+        logicalRelation
+    }
+    if(!isValidPlan) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
 
   override def afterAll: Unit = {
     sql("DROP TABLE IF EXISTS mainTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index fdeb2bd..322827e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -230,6 +230,11 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll
{
     checkAnswer(df, Row("vishal", 29))
   }
 
+  test("test PreAggregate table selection 29") {
+    val df = sql("select sum(id) from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2")
+  }
+
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
     sql("drop table if exists lineitem")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
new file mode 100644
index 0000000..0f87803
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateWithSubQuery.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+
+class TestPreAggregateWithSubQuery extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists mainTable")
+    sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+    sql("CREATE TABLE mainTable1(id int, name string, city string, age string) STORED BY
'org.apache.carbondata.format'")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,sum(age)
from mainTable group by name")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable1")
+  }
+
+  test("test sub query PreAggregate table selection 1") {
+    val df = sql("select t2.newnewname as newname from mainTable1 t1 join (select name as
newnewname,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.newnewname group
by t2.newnewname")
+    matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0")
+  }
+
+  test("test sub query PreAggregate table selection 2") {
+    val df = sql("select t1.name,t1.city from mainTable1 t1 join (select name as newnewname,sum(age)
as sum from mainTable group by name )t2 on t1.name=t2.newnewname")
+    matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0")
+  }
+
+  test("test sub query PreAggregate table selection 3") {
+    val df = sql("select t1.name,t2.sum from mainTable1 t1 join (select name as newnewname,sum(age)
as sum from mainTable group by name )t2 on t1.name=t2.newnewname")
+    matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0")
+  }
+
+  test("test sub query PreAggregate table selection 4") {
+    val df = sql("select t1.name,t2.sum from mainTable1 t1 join (select name,sum(age) as
sum from mainTable group by name )t2 on t1.name=t2.name group by t1.name, t2.sum")
+    matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0")
+  }
+
+  /**
+   * Below method will be used to collect all the logical relation from logical plan
+   * @param logicalPlan
+   * query logical plan
+   * @return all the logical relation
+   */
+  def collectLogicalRelation(logicalPlan: LogicalPlan) : Seq[LogicalRelation] = {
+    logicalPlan.collect{
+      case l:LogicalRelation => l
+    }
+  }
+
+  /**
+   * Below method will be used to match the logical relation
+   * @param logicalRelations
+   * all logical relation
+   * @param tableName
+   * table name
+   */
+  def matchTable(logicalRelations: Seq[LogicalRelation], tableName: String) {
+    assert(logicalRelations.exists {
+      case l:LogicalRelation  if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
=>
+        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.getTableName.
+          equalsIgnoreCase(tableName)
+    })
+  }
+
+  override def afterAll: Unit = {
+    sql("drop table if exists mainTable")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 217436d..86d0c6a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -20,18 +20,18 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession}
-import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCast
=> Cast, MatchCastExpression}
+import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCastExpression}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression,
NamedExpression, ScalaUDF}
-import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq,
Cast, Expression, ExprId, NamedExpression, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, LongType}
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -85,14 +85,10 @@ object PreAggregateUtil {
 
   /**
    * Below method will be used to get the fields from expressions
-   * @param groupByExp
-   *                  grouping expression
-   * @param aggExp
-   *               aggregate expression
-   * @param logicalRelation
-   *                        logical relation
-   * @param selectStmt
-   *                   select statement
+   * @param groupByExp grouping expression
+   * @param aggExp aggregate expression
+   * @param logicalRelation logical relation
+   * @param selectStmt select statement
    * @return fields from expressions
    */
   def getFieldsFromPlan(groupByExp: Seq[Expression],
@@ -165,16 +161,11 @@ object PreAggregateUtil {
   /**
    * Below method will be used to get the column relation
    * with the parent column which will be used during query and data loading
-   * @param parentColumnName
-   * parent column name
-   * @param parentTableId
-   * parent column id
-   * @param parentTableName
-   * parent table name
-   * @param parentDatabaseName
-   * parent database name
-   * @param carbonTable
-   * carbon table
+   * @param parentColumnName parent column name
+   * @param parentTableId parent column id
+   * @param parentTableName parent table name
+   * @param parentDatabaseName parent database name
+   * @param carbonTable carbon table
    * @return column relation object
    */
   def getColumnRelation(parentColumnName: String,
@@ -280,7 +271,7 @@ object PreAggregateUtil {
           "sum")
         list += createFieldForAggregateExpression(
           exp,
-          changeDataType,
+          LongType,
           carbonTable,
           newColumnName,
           "count")
@@ -293,7 +284,7 @@ object PreAggregateUtil {
           "sum")
         list += createFieldForAggregateExpression(
           exp,
-          avg.dataType,
+          LongType,
           carbonTable,
           newColumnName,
           "count")
@@ -306,16 +297,11 @@ object PreAggregateUtil {
   /**
    * Below method will be used to get the field and its data map field object
    * for aggregate expression
-   * @param expression
-   *                   expression in aggregate function
-   * @param dataType
-   *                 data type
-   * @param carbonTable
-   *                    parent carbon table
-   * @param newColumnName
-   *                      column name of aggregate table
-   * @param aggregationName
-   *                        aggregate function name
+   * @param expression expression in aggregate function
+   * @param dataType data type
+   * @param carbonTable parent carbon table
+   * @param newColumnName column name of aggregate table
+   * @param aggregationName aggregate function name
    * @return field and its metadata tuple
    */
   def createFieldForAggregateExpression(
@@ -552,8 +538,7 @@ object PreAggregateUtil {
    * Below method will be used to update logical plan
    * this is required for creating pre aggregate tables,
    * so @CarbonPreAggregateRules will not be applied during creation
-   * @param logicalPlan
-   *                    actual logical plan
+   * @param logicalPlan actual logical plan
    * @return updated plan
    */
   def updatePreAggQueyPlan(logicalPlan: LogicalPlan): LogicalPlan = {
@@ -654,10 +639,8 @@ object PreAggregateUtil {
   /**
    * Below method will be used to get the select query when rollup policy is
    * applied in case of timeseries table
-   * @param tableSchema
-   *                    main data map schema
-   * @param selectedDataMapSchema
-   *                              selected data map schema for rollup
+   * @param tableSchema main data map schema
+   * @param selectedDataMapSchema selected data map schema for rollup
    * @return select query based on rolloup
    */
   def createTimeseriesSelectQueryForRollup(
@@ -695,10 +678,8 @@ object PreAggregateUtil {
    * Below method will be used to creating select query for timeseries
    * for lowest level for aggergation like second level, in that case it will
    * hit the maintable
-   * @param tableSchema
-   *                    data map schema
-   * @param parentTableName
-   *                        parent schema
+   * @param tableSchema data map schema
+   * @param parentTableName parent schema
    * @return select query for loading
    */
   def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema,
@@ -728,10 +709,8 @@ object PreAggregateUtil {
     /**
    * Below method will be used to select rollup table in case of
    * timeseries data map loading
-   * @param list
-   *             list of timeseries datamap
-   * @param dataMapSchema
-   *                      datamap schema
+   * @param list list of timeseries datamap
+   * @param dataMapSchema datamap schema
    * @return select table name
    */
   def getRollupDataMapNameForTimeSeries(
@@ -748,4 +727,132 @@ object PreAggregateUtil {
       rollupDataMapSchema.lastOption
     }
   }
+
+  /**
+   * Below method will be used to validate aggregate function and get the attribute information
+   * which is applied on select query.
+   * Currently sum, max, min, count, avg is supported
+   * in case of any other aggregate function it will return empty sequence
+   * In case of avg it will return two fields one for count
+   * and other of sum of that column to support rollup
+   *
+   * @param aggExp aggregate expression
+   * @return list of fields
+   */
+  def validateAggregateFunctionAndGetFields(aggExp: AggregateExpression):
+  Seq[AggregateExpression] = {
+    aggExp.aggregateFunction match {
+      case Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        Seq(AggregateExpression(Sum(Cast(
+          exp,
+          changeDataType)),
+          aggExp.mode,
+          aggExp.isDistinct))
+      case Sum(_: Expression) =>
+        Seq(aggExp)
+      case Count(MatchCastExpression(exp: Seq[Expression], changeDataType: DataType)) =>
+        Seq(AggregateExpression(Count(Cast(
+          exp,
+          changeDataType)),
+          aggExp.mode,
+          aggExp.isDistinct))
+      case Count(_: Seq[Expression]) =>
+        Seq(aggExp)
+      case Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        Seq(AggregateExpression(Min(Cast(
+          exp,
+          changeDataType)),
+          aggExp.mode,
+          aggExp.isDistinct))
+      case Min(exp: Expression) =>
+        Seq(aggExp)
+      case Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        Seq(AggregateExpression(Max(Cast(
+          exp,
+          changeDataType)),
+          aggExp.mode,
+          aggExp.isDistinct))
+      case Max(exp: Expression) =>
+        Seq(aggExp)
+      // in case of average need to return two columns
+      // sum and count of the column to added during table creation to support rollup
+      case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        Seq(AggregateExpression(Sum(Cast(
+          exp,
+          changeDataType)),
+          aggExp.mode,
+          aggExp.isDistinct),
+          AggregateExpression(Count(Cast(
+            exp,
+            changeDataType)),
+            aggExp.mode,
+            aggExp.isDistinct))
+      // in case of average need to return two columns
+      // sum and count of the column to added during table creation to support rollup
+      case Average(exp: Expression) =>
+        Seq(AggregateExpression(Sum(exp),
+          aggExp.mode,
+          aggExp.isDistinct),
+          AggregateExpression(Count(exp),
+            aggExp.mode,
+            aggExp.isDistinct))
+      case _ =>
+        Seq.empty
+    }
+  }
+
+  /**
+   * Below method will be used to get the logical plan from aggregate expression
+   * @param aggExp aggregate expression
+   * @param tableName parent table name
+   * @param databaseName database name
+   * @param logicalRelation logical relation
+   * @return logical plan
+   */
+  def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
+      tableName: String,
+      databaseName: String,
+      logicalRelation: LogicalRelation,
+      sparkSession: SparkSession,
+      parser: CarbonSpark2SqlParser): LogicalPlan = {
+    // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not
+    // be applied
+    val query = parser.addPreAggFunction(s"Select ${ aggExp.sql } from $databaseName.$tableName")
+    // updating the logical relation of logical plan to so when two logical plan
+    // will be compared it will not consider relation
+    updateLogicalRelation(sparkSession.sql(query).logicalPlan, logicalRelation)
+  }
+
+  /**
+   * Below method will be used to update the logical plan of expression
+   * with parent table logical relation
+   * @param logicalPlan logial plan
+   * @param logicalRelation maintable logical relation
+   * @return updated plan
+   */
+  def updateLogicalRelation(logicalPlan: LogicalPlan,
+      logicalRelation: LogicalRelation): LogicalPlan = {
+    logicalPlan transform {
+      case l: LogicalRelation =>
+        l.copy(relation = logicalRelation.relation)
+    }
+  }
+
+  /**
+   * Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
+   * with its referenced ordinal from input attributes. It's similar to `BindReferences`
but we
+   * do not use `BindReferences` here as the plan may take the expression as a parameter
with type
+   * `Attribute`, and replace it with `BoundReference` will cause error.
+   */
+  def normalizeExprId[T <: Expression](e: T, input: AttributeSeq): T = {
+    e.transformUp {
+      case ar: AttributeReference =>
+        val ordinal = input.indexOf(ar.exprId)
+        if (ordinal == -1) {
+          ar
+        } else {
+          ar.withExprId(ExprId(ordinal))
+        }
+    }.canonicalized.asInstanceOf[T]
+  }
 }


Mime
View raw message