flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-7357] [table] Fix translation of group window queries with window props and HAVING.
Date Wed, 20 Sep 2017 14:29:43 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 100951e27 -> cc71dec10


[FLINK-7357] [table] Fix translation of group window queries with window props and HAVING.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc71dec1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc71dec1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc71dec1

Branch: refs/heads/release-1.3
Commit: cc71dec108f28562bca5f99c53950a7be6d1ba54
Parents: 100951e
Author: Rong Rong <rongr@uber.com>
Authored: Thu Aug 10 10:46:25 2017 -0700
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Sep 20 11:00:33 2017 +0200

----------------------------------------------------------------------
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   4 +-
 .../common/WindowStartEndPropertiesRule.scala   | 169 ++++++++++++-------
 .../scala/batch/sql/WindowAggregateTest.scala   |  42 +++++
 .../table/api/scala/stream/sql/SqlITCase.scala  |  51 ++++++
 .../scala/stream/sql/WindowAggregateTest.scala  |  39 +++++
 5 files changed, 243 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index bb3833a..e11f3e0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -137,7 +137,8 @@ object FlinkRuleSets {
 
     // Transform window to LogicalWindowAggregate
     DataSetLogicalWindowAggregateRule.INSTANCE,
-    WindowStartEndPropertiesRule.INSTANCE
+    WindowStartEndPropertiesRule.INSTANCE,
+    WindowStartEndPropertiesHavingRule.INSTANCE
   )
 
   /**
@@ -169,6 +170,7 @@ object FlinkRuleSets {
     // Transform window to LogicalWindowAggregate
     DataStreamLogicalWindowAggregateRule.INSTANCE,
     WindowStartEndPropertiesRule.INSTANCE,
+    WindowStartEndPropertiesHavingRule.INSTANCE,
 
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 14e9b21..33190e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -18,20 +18,19 @@
 
 package org.apache.flink.table.plan.rules.common
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
 import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 
 import scala.collection.JavaConversions._
 
-class WindowStartEndPropertiesRule
-  extends RelOptRule(
-    WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE,
-    "WindowStartEndPropertiesRule") {
+abstract class WindowStartEndPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName:
String)
+  extends RelOptRule(rulePredicate, ruleName) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val project = call.rel(0).asInstanceOf[LogicalProject]
@@ -49,61 +48,24 @@ class WindowStartEndPropertiesRule
     project.getProjects.exists(hasGroupAuxiliaries)
   }
 
-  override def onMatch(call: RelOptRuleCall): Unit = {
-
-    val project = call.rel(0).asInstanceOf[LogicalProject]
-    val innerProject = call.rel(1).asInstanceOf[LogicalProject]
-    val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
-
-    // Retrieve window start and end properties
-    val transformed = call.builder()
-    val rexBuilder = transformed.getRexBuilder
-    transformed.push(LogicalWindowAggregate.create(
-      agg.getWindow,
-      Seq(
-        NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
-        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
-      ), agg)
-    )
-
-    // forward window start and end properties
-    transformed.project(
-      innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
-
-    def replaceGroupAuxiliaries(node: RexNode): RexNode = {
-      node match {
-        case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
-          // replace expression by access to window start
-          rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
-        case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
-          // replace expression by access to window end
-          rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
-        case c: RexCall =>
-          // replace expressions in children
-          val newOps = c.getOperands.map(replaceGroupAuxiliaries)
-          c.clone(c.getType, newOps)
-        case x =>
-          // preserve expression
-          x
-      }
+  def replaceGroupAuxiliaries(node: RexNode, relBuilder: RelBuilder): RexNode = {
+    val rexBuilder = relBuilder.getRexBuilder
+    node match {
+      case c: RexCall if isWindowStart(c) =>
+        // replace expression by access to window start
+        rexBuilder.makeCast(c.getType, relBuilder.field("w$start"), false)
+      case c: RexCall if isWindowEnd(c) =>
+        // replace expression by access to window end
+        rexBuilder.makeCast(c.getType, relBuilder.field("w$end"), false)
+      case c: RexCall =>
+        // replace expressions in children
+        val newOps = c.getOperands.map(x => replaceGroupAuxiliaries(x, relBuilder))
+        c.clone(c.getType, newOps)
+      case x =>
+        // preserve expression
+        x
     }
-
-    // replace window auxiliary function by access to window properties
-    transformed.project(
-      project.getProjects.map(replaceGroupAuxiliaries)
-    )
-    val res = transformed.build()
-    call.transformTo(res)
   }
-}
-
-object WindowStartEndPropertiesRule {
-  private val WINDOW_EXPRESSION_RULE_PREDICATE =
-    RelOptRule.operand(classOf[LogicalProject],
-      RelOptRule.operand(classOf[LogicalProject],
-        RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))
-
-  val INSTANCE = new WindowStartEndPropertiesRule
 
   /** Checks if a RexNode is a window start auxiliary function. */
   private def isWindowStart(node: RexNode): Boolean = {
@@ -113,7 +75,7 @@ object WindowStartEndPropertiesRule {
           case SqlStdOperatorTable.TUMBLE_START |
                SqlStdOperatorTable.HOP_START |
                SqlStdOperatorTable.SESSION_START
-            => true
+          => true
           case _ => false
         }
       case _ => false
@@ -128,10 +90,95 @@ object WindowStartEndPropertiesRule {
           case SqlStdOperatorTable.TUMBLE_END |
                SqlStdOperatorTable.HOP_END |
                SqlStdOperatorTable.SESSION_END
-            => true
+          => true
           case _ => false
         }
       case _ => false
     }
   }
 }
+
+object WindowStartEndPropertiesRule {
+
+  val INSTANCE = new WindowStartEndPropertiesBaseRule(
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalProject],
+        RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))),
+    "WindowStartEndPropertiesRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+
+      val project = call.rel(0).asInstanceOf[LogicalProject]
+      val innerProject = call.rel(1).asInstanceOf[LogicalProject]
+      val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+
+      // Retrieve window start and end properties
+      val builder = call.builder()
+      builder.push(LogicalWindowAggregate.create(
+        agg.getWindow,
+        Seq(
+          NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+          NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))),
+        agg)
+      )
+
+      // forward window start and end properties
+      builder.project(
+        innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end")))
+
+      // replace window auxiliary function by access to window properties
+      builder.project(
+        project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+      val res = builder.build()
+      call.transformTo(res)
+    }
+  }
+}
+
+object WindowStartEndPropertiesHavingRule {
+
+  val INSTANCE = new WindowStartEndPropertiesBaseRule(
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalFilter],
+        RelOptRule.operand(classOf[LogicalProject],
+          RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))),
+    "WindowStartEndPropertiesHavingRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+
+      val project = call.rel(0).asInstanceOf[LogicalProject]
+      val filter = call.rel(1).asInstanceOf[LogicalFilter]
+      val innerProject = call.rel(2).asInstanceOf[LogicalProject]
+      val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate]
+
+      // Retrieve window start and end properties
+      val builder = call.builder()
+      builder.push(LogicalWindowAggregate.create(
+        agg.getWindow,
+        Seq(
+          NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+          NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))),
+        agg)
+      )
+
+      // forward window start and end properties
+      builder.project(
+        innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end")))
+
+      // replace window auxiliary function by access to window properties
+      builder.filter(
+        filter.getChildExps.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+
+      // replace window auxiliary function by access to window properties
+      builder.project(
+        project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+
+      val res = builder.build()
+      call.transformTo(res)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
index 328c03c..a0afe5e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -335,4 +335,46 @@ class WindowAggregateTest extends TableTestBase {
       "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)"
     util.verifySql(sql, "n/a")
   }
+
+  @Test
+  def testExpressionOnWindowHavingFunction() = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "HAVING " +
+        "  SUM(a) > 0 AND " +
+        "  QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts, a")
+          ),
+          term("window", SlidingGroupWindow('w$, 'ts, 60000.millis, 900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+            "SUM(a) AS $f1",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end")
+        ),
+        term("select", "EXPR$0", "CAST(w$start) AS w$start"),
+        term("where",
+          "AND(>($f1, 0), " +
+            "=(+(/INT(-(EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret(CAST(w$start)), 86400000)),
" +
+              "1), 3), 1), 1))")
+      )
+
+    util.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 55633ff..834c243 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
@@ -26,6 +27,7 @@ import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestDa
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
@@ -315,5 +317,54 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testHopStartEndWithHaving(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQueryHopStartEndWithHaving =
+      """
+        |SELECT
+        |  c AS k,
+        |  COUNT(a) AS v,
+        |  HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart,
+        |  HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd
+        |FROM T1
+        |GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), c
+        |HAVING
+        |  SUM(b) > 1 AND
+        |    QUARTER(HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)) = 1
+      """.stripMargin
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Right(14000010L),
+      Left(8640000000L, (4, 1L, "Hello")), // data for the quarter to validate having filter
+      Left(8640000001L, (4, 1L, "Hello")),
+      Right(8640000010L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row]
+    resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row])
+
+    env.execute()
+
+    val expected = List(
+      "Hello,2,1970-01-01 03:53:00.0,1970-01-01 03:54:00.0"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index f95d0ab..f7735ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -224,4 +224,43 @@ class WindowAggregateTest extends TableTestBase {
 
     streamUtil.verifySql(sqlQuery, "n/a")
   }
+
+  @Test
+  def testExpressionOnWindowHavingFunction() = {
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "FROM MyTable " +
+        "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "HAVING " +
+        "  SUM(a) > 0 AND " +
+        "  QUARTER(HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "rowtime, a")
+          ),
+          term("window", SlidingGroupWindow('w$, 'rowtime, 60000.millis, 900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+            "SUM(a) AS $f1",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end")
+        ),
+        term("select", "EXPR$0", "w$start"),
+        term("where",
+          "AND(>($f1, 0), " +
+            "=(+(/INT(-(EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret(w$start), 86400000)),
1), " +
+              "3), 1), 1))")
+      )
+
+    streamUtil.verifySql(sql, expected)
+  }
 }


Mime
View raw message