flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10474) Don't translate IN with Literals to JOIN with VALUES for streaming queries
Date Tue, 16 Oct 2018 09:27:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651385#comment-16651385
] 

ASF GitHub Bot commented on FLINK-10474:
----------------------------------------

asfgit closed pull request #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with
VALUES
URL: https://github.com/apache/flink/pull/6792
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d740c3f1f99..564fb8648a2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -33,6 +33,7 @@ import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -108,6 +109,15 @@ abstract class TableEnvironment(val config: TableConfig) {
   // registered external catalog names -> catalog
   private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]
 
+  // configuration for SqlToRelConverter
+  private[flink] lazy val sqlToRelConverterConfig: SqlToRelConverter.Config = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getSqlToRelConverterConfig match {
+      case Some(c) => c
+      case None => getSqlToRelConverterConfig
+    }
+  }
+
   /** Returns the table config to define the runtime behavior of the Table API. */
   def getConfig: TableConfig = config
 
@@ -118,6 +128,17 @@ abstract class TableEnvironment(val config: TableConfig) {
     case _ => null
   }
 
+  /**
+    * Returns the SqlToRelConverter config.
+    */
+  protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
+    SqlToRelConverter.configBuilder()
+      .withTrimUnusedFields(false)
+      .withConvertTableAccess(false)
+      .withInSubQueryThreshold(Integer.MAX_VALUE)
+      .build()
+  }
+
   /**
     * Returns the operator table for this environment including a custom Calcite configuration.
     */
@@ -698,7 +719,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @return The result of the query as Table
     */
   def sqlQuery(query: String): Table = {
-    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+    val planner = new FlinkPlannerImpl(
+      getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
     // parse the sql query
     val parsed = planner.parse(query)
     if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
@@ -758,7 +780,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param config The [[QueryConfig]] to use.
     */
   def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
-    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+    val planner = new FlinkPlannerImpl(
+      getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
     // parse the sql query
     val parsed = planner.parse(stmt)
     parsed match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
index accc628c914..b5d6c06e40f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.util.Preconditions
 
@@ -72,6 +73,11 @@ class CalciteConfigBuilder {
     */
   private var replaceSqlParserConfig: Option[SqlParser.Config] = None
 
+  /**
+    * Defines a configuration for SqlToRelConverter.
+    */
+  private var replaceSqlToRelConverterConfig: Option[SqlToRelConverter.Config] = None
+
   /**
     * Replaces the built-in normalization rule set with the given rule set.
     */
@@ -183,6 +189,15 @@ class CalciteConfigBuilder {
     this
   }
 
+  /**
+    * Replaces the built-in SqlToRelConverter configuration with the given configuration.
+    */
+  def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): CalciteConfigBuilder
= {
+    Preconditions.checkNotNull(config)
+    replaceSqlToRelConverterConfig = Some(config)
+    this
+  }
+
   private class CalciteConfigImpl(
       val getNormRuleSet: Option[RuleSet],
       val replacesNormRuleSet: Boolean,
@@ -194,7 +209,8 @@ class CalciteConfigBuilder {
       val replacesDecoRuleSet: Boolean,
       val getSqlOperatorTable: Option[SqlOperatorTable],
       val replacesSqlOperatorTable: Boolean,
-      val getSqlParserConfig: Option[SqlParser.Config])
+      val getSqlParserConfig: Option[SqlParser.Config],
+      val getSqlToRelConverterConfig: Option[SqlToRelConverter.Config])
     extends CalciteConfig
 
 
@@ -233,7 +249,8 @@ class CalciteConfigBuilder {
         Some(operatorTables.reduce((x, y) => ChainedSqlOperatorTable.of(x, y)))
     },
     this.replaceOperatorTable,
-    replaceSqlParserConfig)
+    replaceSqlParserConfig,
+    replaceSqlToRelConverterConfig)
 }
 
 /**
@@ -295,6 +312,11 @@ trait CalciteConfig {
     * Returns a custom SQL parser configuration.
     */
   def getSqlParserConfig: Option[SqlParser.Config]
+
+  /**
+    * Returns a custom configuration for SqlToRelConverter.
+    */
+  def getSqlToRelConverterConfig: Option[SqlToRelConverter.Config]
 }
 
 object CalciteConfig {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index de5b8993c28..b9e334a16d9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -47,7 +47,8 @@ import scala.collection.JavaConversions._
 class FlinkPlannerImpl(
     config: FrameworkConfig,
     planner: RelOptPlanner,
-    typeFactory: FlinkTypeFactory) {
+    typeFactory: FlinkTypeFactory,
+    sqlToRelConverterConfig: SqlToRelConverter.Config) {
 
   val operatorTable: SqlOperatorTable = config.getOperatorTable
   /** Holds the trait definitions to be registered with planner. May be null. */
@@ -97,10 +98,13 @@ class FlinkPlannerImpl(
       assert(validatedSqlNode != null)
       val rexBuilder: RexBuilder = createRexBuilder
       val cluster: RelOptCluster = FlinkRelOptClusterFactory.create(planner, rexBuilder)
-      val config = SqlToRelConverter.configBuilder()
-        .withTrimUnusedFields(false).withConvertTableAccess(false).build()
       val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-        new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
+        new ViewExpanderImpl,
+        validator,
+        createCatalogReader,
+        cluster,
+        convertletTable,
+        sqlToRelConverterConfig)
       root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
       // we disable automatic flattening in order to let composite types pass without modification
       // we might enable it again once Calcite has better support for structured types
@@ -142,10 +146,13 @@ class FlinkPlannerImpl(
       val validatedSqlNode: SqlNode = validator.validate(sqlNode)
       val rexBuilder: RexBuilder = createRexBuilder
       val cluster: RelOptCluster = FlinkRelOptClusterFactory.create(planner, rexBuilder)
-      val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder
-        .withTrimUnusedFields(false).withConvertTableAccess(false).build
       val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-        new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config)
+        new ViewExpanderImpl,
+        validator,
+        catalogReader,
+        cluster,
+        convertletTable,
+        sqlToRelConverterConfig)
       root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
       root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
       root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 19e030b4c0a..4f8dc13ff4c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -928,6 +928,11 @@ abstract class CodeGenerator(
         val right = operands.tail
         generateIn(this, left, right)
 
+      case NOT_IN =>
+        val left = operands.head
+        val right = operands.tail
+        generateNot(nullCheck, generateIn(this, left, right))
+
       // casting
       case CAST | REINTERPRET =>
         val operand = operands.head
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index e4cd8d10aa3..a4a09f716d1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -164,7 +164,11 @@ object FlinkRuleSets {
     WindowPropertiesHavingRule.INSTANCE,
 
     // expand distinct aggregate to normal aggregate with groupby
-    AggregateExpandDistinctAggregatesRule.JOIN
+    AggregateExpandDistinctAggregatesRule.JOIN,
+
+    // merge a cascade of predicates to IN or NOT_IN
+    ConvertToNotInOrInRule.IN_INSTANCE,
+    ConvertToNotInOrInRule.NOT_IN_INSTANCE
   )
 
   /**
@@ -201,7 +205,11 @@ object FlinkRuleSets {
     ReduceExpressionsRule.FILTER_INSTANCE,
     ReduceExpressionsRule.PROJECT_INSTANCE,
     ReduceExpressionsRule.CALC_INSTANCE,
-    ProjectToWindowRule.PROJECT
+    ProjectToWindowRule.PROJECT,
+
+    // merge a cascade of predicates to IN or NOT_IN
+    ConvertToNotInOrInRule.IN_INSTANCE,
+    ConvertToNotInOrInRule.NOT_IN_INSTANCE
   )
 
   /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/ConvertToNotInOrInRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/ConvertToNotInOrInRule.scala
new file mode 100644
index 00000000000..1fd08a9c518
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/ConvertToNotInOrInRule.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlBinaryOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{IN, NOT_IN, EQUALS, NOT_EQUALS, AND,
OR}
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+  * Rule for converting a cascade of predicates to [[IN]] or [[NOT_IN]].
+  *
+  * For example, convert predicate: (x = 1 OR x = 2 OR x = 3) AND y = 4 to
+  * predicate: x IN (1, 2, 3) AND y = 4.
+  *
+  * @param toOperator       The toOperator, for example, when convert to [[IN]], toOperator
is
+  *                         [[IN]]. We convert a cascade of [[EQUALS]] to [[IN]].
+  * @param description      The description of the rule.
+  */
+class ConvertToNotInOrInRule(
+    toOperator: SqlBinaryOperator,
+    description: String)
+  extends RelOptRule(
+    operand(classOf[Filter], any),
+    description) {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val filter: Filter = call.rel(0)
+    convertToNotInOrIn(call.builder(), filter.getCondition) match {
+      case Some(newRex) =>
+        call.transformTo(filter.copy(
+          filter.getTraitSet,
+          filter.getInput,
+          newRex))
+
+      case None => // do nothing
+    }
+  }
+
+  /**
+    * Returns a condition decomposed by [[AND]] or [[OR]].
+    */
+  private def decomposedBy(rex: RexNode, operator: SqlBinaryOperator): Seq[RexNode] = {
+    operator match {
+      case AND => RelOptUtil.conjunctions(rex)
+      case OR => RelOptUtil.disjunctions(rex)
+    }
+  }
+
+  /**
+    * Convert a cascade predicates to [[IN]] or [[NOT_IN]].
+    *
+    * @param builder The [[RelBuilder]] to build the [[RexNode]].
+    * @param rex     The predicates to be converted.
+    * @return The converted predicates.
+    */
+  private def convertToNotInOrIn(
+    builder: RelBuilder,
+    rex: RexNode): Option[RexNode] = {
+
+    // For example, when convert to [[IN]], fromOperator is [[EQUALS]].
+    // We convert a cascade of [[EQUALS]] to [[IN]].
+    // A connect operator is used to connect the fromOperator.
+    // A composed operator may contains sub [[IN]] or [[NOT_IN]].
+    val (fromOperator, connectOperator, composedOperator) = toOperator match {
+      case IN => (EQUALS, OR, AND)
+      case NOT_IN => (NOT_EQUALS, AND, OR)
+    }
+
+    val decomposed = decomposedBy(rex, connectOperator)
+    val combineMap = new mutable.HashMap[String, mutable.ListBuffer[RexCall]]
+    val rexBuffer = new mutable.ArrayBuffer[RexNode]
+    var beenConverted = false
+
+    // traverse decomposed predicates
+    decomposed.foreach {
+      case call: RexCall =>
+        call.getOperator match {
+          // put same predicates into combine map
+          case `fromOperator` =>
+            (call.operands(0), call.operands(1)) match {
+              case (ref, _: RexLiteral) =>
+                combineMap.getOrElseUpdate(ref.toString, mutable.ListBuffer[RexCall]()) +=
call
+              case (l: RexLiteral, ref) =>
+                combineMap.getOrElseUpdate(ref.toString, mutable.ListBuffer[RexCall]()) +=
+                  call.clone(call.getType, List(ref, l))
+              case _ => rexBuffer += call
+            }
+
+          // process sub predicates
+          case `composedOperator` =>
+            val newRex = decomposedBy(call, composedOperator).map({ r =>
+              convertToNotInOrIn(builder, r) match {
+                case Some(newRex) =>
+                  beenConverted = true
+                  newRex
+                case None => r
+              }
+            })
+            composedOperator match {
+              case AND => rexBuffer += builder.and(newRex)
+              case OR => rexBuffer += builder.or(newRex)
+            }
+
+          case _ => rexBuffer += call
+        }
+
+      case rex => rexBuffer += rex
+    }
+
+    combineMap.values.foreach { list =>
+      // only convert to IN or NOT_IN when size >= 3.
+      if (list.size >= 3) {
+        val inputRef = list.head.getOperands.head
+        val values = list.map(_.getOperands.last)
+        rexBuffer += builder.getRexBuilder.makeCall(toOperator, List(inputRef) ++ values)
+        beenConverted = true
+      } else {
+        connectOperator match {
+          case AND => rexBuffer += builder.and(list)
+          case OR => rexBuffer += builder.or(list)
+        }
+      }
+    }
+
+    if (beenConverted) {
+      // return result if has been converted
+      connectOperator match {
+        case AND => Some(builder.and(rexBuffer))
+        case OR => Some(builder.or(rexBuffer))
+      }
+    } else {
+      None
+    }
+  }
+}
+
+object ConvertToNotInOrInRule {
+
+  /**
+    * Rule to convert multi [[EQUALS]] to [[IN]].
+    *
+    * For example, convert predicate: (x = 1 OR x = 2 OR x = 3) AND y = 4 to
+    * predicate: x IN (1, 2, 3) AND y = 4.
+    *
+    */
+  val IN_INSTANCE = new ConvertToNotInOrInRule(IN, "MergeMultiEqualsToInRule")
+
+  /**
+    * Rule to convert multi [[NOT_EQUALS]] to [[NOT_IN]].
+    *
+    * For example, convert predicate: (x <> 1 AND x <> 2 AND x <> 3) OR
y <> 4 to
+    * predicate: x NOT_IN (1, 2, 3) OR y <> 4.
+    *
+    */
+  val NOT_IN_INSTANCE = new ConvertToNotInOrInRule(NOT_IN, "MergeMultiNotEqualsToNotInRule")
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CalcTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CalcTest.scala
index 6271facd680..382aeddfe91 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CalcTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CalcTest.scala
@@ -47,4 +47,40 @@ class CalcTest extends TableTestBase {
       "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
       expected)
   }
+
+  @Test
+  def testIn(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val resultStr = (1 to 30).mkString(", ")
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", s"IN(b, $resultStr)")
+    )
+
+    util.verifySql(
+      s"SELECT * FROM MyTable WHERE b in ($resultStr)",
+      expected)
+  }
+
+  @Test
+  def testNotIn(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val resultStr = (1 to 30).mkString(", ")
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", s"NOT IN(b, $resultStr)")
+    )
+
+    util.verifySql(
+      s"SELECT * FROM MyTable WHERE b NOT IN ($resultStr)",
+      expected)
+  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
index 8cbc03c5cfd..6f2718b22b3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
@@ -113,6 +113,40 @@ class CalcTest extends TableTestBase {
 
     util.verifyTable(resultTable, expected)
   }
+
+  @Test
+  def testIn(): Unit = {
+    val util = streamTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val resultTable = sourceTable.select('a, 'b, 'c)
+      .where(s"${(1 to 30).map("b = " + _).mkString(" || ")} && c = 'xx'")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", s"AND(IN(b, ${(1 to 30).mkString(", ")}), =(c, 'xx'))")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testNotIn(): Unit = {
+    val util = streamTestUtil()
+    val sourceTable = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val resultTable = sourceTable.select('a, 'b, 'c)
+      .where(s"${(1 to 30).map("b != " + _).mkString(" && ")} || c != 'xx'")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", s"OR(NOT IN(b, ${(1 to 30).mkString(", ")}), <>(c, 'xx'))")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
 }
 
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala
index c73c20cb825..cb5c6908f64 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.calcite
 
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.sql.fun.{OracleSqlOperatorTable, SqlStdOperatorTable}
+import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools.RuleSets
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.junit.Assert._
@@ -397,4 +398,20 @@ class CalciteConfigBuilderTest {
     }
 
   }
+
+  @Test
+  def testReplaceSqlToRelConverterConfig(): Unit = {
+    val config = SqlToRelConverter.configBuilder()
+      .withTrimUnusedFields(false)
+      .withConvertTableAccess(false)
+      .withInSubQueryThreshold(Integer.MAX_VALUE)
+      .build()
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceSqlToRelConverterConfig(config)
+      .build()
+
+    assertTrue(cc.getSqlToRelConverterConfig.isDefined)
+    assertEquals(Integer.MAX_VALUE, cc.getSqlToRelConverterConfig.get.getInSubQueryThreshold)
+  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index dd6e00e73bc..8c0e5e3d16c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -66,7 +66,8 @@ abstract class ExpressionTestBase {
   private val planner = new FlinkPlannerImpl(
     context._2.getFrameworkConfig,
     context._2.getPlanner,
-    context._2.getTypeFactory)
+    context._2.getTypeFactory,
+    context._2.sqlToRelConverterConfig)
   private val logicalOptProgram = Programs.ofRules(FlinkRuleSets.LOGICAL_OPT_RULES)
   private val dataSetOptProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 4f1eb6863af..a4e9b631b60 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -256,14 +256,15 @@ class CalcITCase(
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
+    val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20 OR a IN(3,4,5)"
 
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sqlQuery(sqlQuery)
 
-    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+    val expected = "1,1,Hi\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" +
+      "5,3,I am fine.\n" + "21,6,Comment#15\n"
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -274,15 +275,14 @@ class CalcITCase(
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
+    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0 AND b NOT
IN(1,2,3)"
 
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
     val result = tEnv.sqlQuery(sqlQuery)
 
-    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
-      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+    val expected = "7,4,Comment#1\n" + "9,4,Comment#3\n" + "17,6,Comment#11\n" +
       "19,6,Comment#13\n" + "21,6,Comment#15\n"
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 6407ebdce22..9d29018fdb3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -201,15 +201,14 @@ class CalcITCase extends AbstractTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 === 0 )
+      .where("b = 3 || b = 4 || b = 5")
     val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
     val expected = mutable.MutableList(
-      "2,2,Hello", "4,3,Hello world, how are you?",
-      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
-      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
-      "18,6,Comment#12", "20,6,Comment#14")
+      "4,3,Hello world, how are you?", "6,3,Luke Skywalker",
+      "8,4,Comment#2", "10,4,Comment#4", "12,5,Comment#6", "14,5,Comment#8")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -225,12 +224,12 @@ class CalcITCase extends AbstractTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 !== 0)
+      .where("b != 1 && b != 2 && b != 3")
     val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink[Row])
     env.execute()
     val expected = mutable.MutableList(
-      "1,1,Hi", "3,2,Hello world",
-      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
+      "7,4,Comment#1", "9,4,Comment#3",
       "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
       "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Don't translate IN with Literals to JOIN with VALUES for streaming queries
> --------------------------------------------------------------------------
>
>                 Key: FLINK-10474
>                 URL: https://issues.apache.org/jira/browse/FLINK-10474
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API &amp; SQL
>    Affects Versions: 1.6.1, 1.7.0
>            Reporter: Fabian Hueske
>            Assignee: Hequn Cheng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> IN predicates with literals are translated to JOIN with VALUES if the number of elements
in the IN clause exceeds a certain threshold. This should not be done, because a streaming
join is very heavy and materializes both inputs (which is fine for the VALUES) input but not
for the other.
> There are two ways to solve this:
>  # don't translate IN to a JOIN at all
>  # translate it to a JOIN but have a special join strategy if one input is bound and
final (non-updating)
> Option 1. should be easy to do, option 2. requires much more effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message