flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [26/50] [abbrv] flink git commit: [FLINK-3504] Fix join translation. Equality predicates may only reference fields.
Date Fri, 18 Mar 2016 13:48:20 GMT
[FLINK-3504] Fix join translation. Equality predicates may only reference fields.

Catch Calcite planner exception and rethrow with additional error message

This closes #1734


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fa4a99a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fa4a99a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fa4a99a

Branch: refs/heads/master
Commit: 8fa4a99a0ee9e40a18424bfdc64e9abe8c72dbb0
Parents: ad3a636
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Feb 29 14:46:08 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Fri Mar 18 14:44:50 2016 +0100

----------------------------------------------------------------------
 .../api/java/table/JavaBatchTranslator.scala    | 25 ++++++++-
 .../plan/rules/logical/FlinkJoinRule.scala      | 54 +++++++++++++++++++-
 .../flink/api/scala/table/test/JoinITCase.scala | 20 ++++++++
 3 files changed, 96 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8fa4a99a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 7e8ee77..f238df3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.table
 
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
 import org.apache.calcite.rel.{RelCollations, RelNode}
 import org.apache.calcite.sql2rel.RelDecorrelator
@@ -75,7 +76,17 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
     // optimize the logical Flink plan
     val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
     val flinkOutputProps = RelTraitSet.createEmpty()
-    val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps)
+
+    val optPlan = try {
+      optProgram.run(planner, decorPlan, flinkOutputProps)
+    }
+    catch {
+      case e: CannotPlanException =>
+        throw new PlanGenException(
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+          s"${RelOptUtil.toString(lPlan)}\n" +
+          "Please consider filing a bug report.", e)
+    }
 
     println("---------------")
     println("Optimized Plan:")
@@ -87,7 +98,17 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
     val dataSetOutputProps = RelTraitSet.createEmpty()
       .plus(DataSetConvention.INSTANCE)
       .plus(RelCollations.of()).simplify()
-    val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps)
+
+    val dataSetPlan = try {
+      dataSetProgram.run(planner, optPlan, dataSetOutputProps)
+    }
+    catch {
+      case e: CannotPlanException =>
+        throw new PlanGenException(
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(lPlan)}\n" +
+            "Please consider filing a bug report.", e)
+    }
 
     println("-------------")
     println("DataSet Plan:")

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa4a99a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
index 3826c9a..82f3eaa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
@@ -18,12 +18,16 @@
 
 package org.apache.flink.api.table.plan.rules.logical
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex.{RexInputRef, RexCall}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention}
 
+import scala.collection.JavaConverters._
+
 class FlinkJoinRule
   extends ConverterRule(
       classOf[LogicalJoin],
@@ -32,6 +36,54 @@ class FlinkJoinRule
       "FlinkJoinRule")
   {
 
+    override def matches(call: RelOptRuleCall): Boolean = {
+
+      val join = call.rel(0).asInstanceOf[LogicalJoin]
+      val children = join.getInputs
+      val rexBuilder = call.builder().getRexBuilder
+
+      val joinInfo = join.analyzeCondition()
+      val joinCondition = join.getCondition
+      val equiCondition =
+        joinInfo.getEquiCondition(children.get(0), children.get(1), rexBuilder)
+
+      // joins require at least one equi-condition
+      if (equiCondition.isAlwaysTrue) {
+        false
+      }
+      else {
+        // check that all equality predicates refer to field refs only (not computed expressions)
+        //   Note: Calcite treats equality predicates on expressions as non-equi predicates
+        joinCondition match {
+
+          // conjunction of join predicates
+          case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.AND) =>
+
+            c.getOperands.asScala
+              // look at equality predicates only
+              .filter { o =>
+                o.isInstanceOf[RexCall] &&
+                o.asInstanceOf[RexCall].getOperator.equals(SqlStdOperatorTable.EQUALS)
+              }
+              // check that both children are field references
+              .map { o =>
+                o.asInstanceOf[RexCall].getOperands.get(0).isInstanceOf[RexInputRef] &&
+                o.asInstanceOf[RexCall].getOperands.get(1).isInstanceOf[RexInputRef]
+              }
+              // any equality predicate that does not refer to a field reference?
+              .reduce( (a, b) => a && b)
+
+          // single equi-join predicate
+          case c: RexCall if c.getOperator.equals(SqlStdOperatorTable.EQUALS) =>
+            c.getOperands.get(0).isInstanceOf[RexInputRef] &&
+              c.getOperands.get(1).isInstanceOf[RexInputRef]
+          case _ =>
+            false
+        }
+      }
+
+    }
+
     def convert(rel: RelNode): RelNode = {
       val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
       val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE)

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa4a99a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index bc9db59..7ce77d1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -197,4 +197,24 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
     val results = joinT.toDataSet[Row]collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testJoinWithExpressionPreds(): Unit = {
+
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c,
'g)
+
+    val expected =
+        "I am fine.,Hallo Welt\n" +
+        "Luke Skywalker,Hallo Welt wie gehts?\n" +
+        "Luke Skywalker,ABC\n" +
+        "Comment#2,HIJ\n" +
+        "Comment#2,IJK"
+    val results = joinT.toDataSet[Row]collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
 }


Mime
View raw message