flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-4639] [table] Introduce CalciteConfig to make Calcite features more pluggable.
Date Tue, 25 Oct 2016 14:15:36 GMT
Repository: flink
Updated Branches:
  refs/heads/master 022ceb58b -> 11fa089d6


[FLINK-4639] [table] Introduce CalciteConfig to make Calcite features more pluggable.

This closes #2521
This closes #1617 // closing PR after discussion


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc0b3430
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc0b3430
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc0b3430

Branch: refs/heads/master
Commit: dc0b3430f8d243b1876125e4949e733ae757aa96
Parents: 022ceb5
Author: twalthr <twalthr@apache.org>
Authored: Tue Sep 20 11:35:25 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Oct 25 12:59:18 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/BatchTableEnvironment.scala |  14 +-
 .../apache/flink/api/table/CalciteConfig.scala  | 161 ++++++++++++++
 .../api/table/StreamTableEnvironment.scala      |  10 +-
 .../apache/flink/api/table/TableConfig.scala    |  19 +-
 .../flink/api/table/TableEnvironment.scala      |  91 ++++++--
 .../api/java/batch/TableEnvironmentITCase.java  |  16 ++
 .../api/table/CalciteConfigBuilderTest.scala    | 208 +++++++++++++++++++
 .../flink/api/table/TableEnvironmentTest.scala  |   5 +-
 8 files changed, 496 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 10c2450..1d34777 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -24,10 +24,9 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.Programs
-
+import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.table.explain.PlanJsonParser
@@ -35,7 +34,7 @@ import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataSetTable}
+import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable}
 import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.api.table.sources.BatchTableSource
 
@@ -229,6 +228,11 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+    * Returns the built-in rules that are defined by the environment.
+    */
+  protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES
+
+  /**
     * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
     * @param relNode The original [[RelNode]] tree
@@ -240,7 +244,7 @@ abstract class BatchTableEnvironment(
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
 
     // optimize the logical Flink plan
-    val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
+    val optProgram = Programs.ofRules(getRuleSet)
     val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
 
     val dataSetPlan = try {

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
new file mode 100644
index 0000000..06b3edc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.tools.{RuleSets, RuleSet}
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConverters._
+
+/**
+  * Builder for creating a Calcite configuration.
+  */
+class CalciteConfigBuilder {
+  private var replaceRules: Boolean = false
+  private var ruleSets: List[RuleSet] = Nil
+
+  private var replaceOperatorTable: Boolean = false
+  private var operatorTables: List[SqlOperatorTable] = Nil
+
+  private var replaceSqlParserConfig: Option[SqlParser.Config] = None
+
+  /**
+    * Replaces the built-in rule set with the given rule set.
+    */
+  def replaceRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(replaceRuleSet)
+    ruleSets = List(replaceRuleSet)
+    replaceRules = true
+    this
+  }
+
+  /**
+    * Appends the given rule set to the built-in rule set.
+    */
+  def addRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(addedRuleSet)
+    ruleSets = addedRuleSet :: ruleSets
+    this
+  }
+
+  /**
+    * Replaces the built-in SQL operator table with the given table.
+    */
+  def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
= {
+    Preconditions.checkNotNull(replaceSqlOperatorTable)
+    operatorTables = List(replaceSqlOperatorTable)
+    replaceOperatorTable = true
+    this
+  }
+
+  /**
+    * Appends the given table to the built-in SQL operator table.
+    */
+  def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
= {
+    Preconditions.checkNotNull(addedSqlOperatorTable)
+    this.operatorTables = addedSqlOperatorTable :: this.operatorTables
+    this
+  }
+
+  /**
+    * Replaces the built-in SQL parser configuration with the given configuration.
+    */
+  def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(sqlParserConfig)
+    replaceSqlParserConfig = Some(sqlParserConfig)
+    this
+  }
+
+  private class CalciteConfigImpl(
+      val getRuleSet: Option[RuleSet],
+      val replacesRuleSet: Boolean,
+      val getSqlOperatorTable: Option[SqlOperatorTable],
+      val replacesSqlOperatorTable: Boolean,
+      val getSqlParserConfig: Option[SqlParser.Config])
+    extends CalciteConfig
+
+  /**
+    * Builds a new [[CalciteConfig]].
+    */
+  def build(): CalciteConfig = new CalciteConfigImpl(
+        ruleSets match {
+      case Nil => None
+      case h :: Nil => Some(h)
+      case _ =>
+        // concat rule sets
+        val concatRules = ruleSets.foldLeft(Nil: Iterable[RelOptRule])( (c, r) => r.asScala
++ c)
+        Some(RuleSets.ofList(concatRules.asJava))
+    },
+    this.replaceRules,
+    operatorTables match {
+      case Nil => None
+      case h :: Nil => Some(h)
+      case _ =>
+        // chain operator tables
+        Some(operatorTables.reduce( (x, y) => ChainedSqlOperatorTable.of(x, y)))
+    },
+    this.replaceOperatorTable,
+    replaceSqlParserConfig)
+}
+
+/**
+  * Calcite configuration for defining a custom Calcite configuration for Table and SQL API.
+  */
+trait CalciteConfig {
+  /**
+    * Returns whether this configuration replaces the built-in rule set.
+    */
+  def replacesRuleSet: Boolean
+
+  /**
+    * Returns a custom rule set.
+    */
+  def getRuleSet: Option[RuleSet]
+
+  /**
+    * Returns whether this configuration replaces the built-in SQL operator table.
+    */
+  def replacesSqlOperatorTable: Boolean
+
+  /**
+    * Returns a custom SQL operator table.
+    */
+  def getSqlOperatorTable: Option[SqlOperatorTable]
+
+  /**
+    * Returns a custom SQL parser configuration.
+    */
+  def getSqlParserConfig: Option[SqlParser.Config]
+}
+
+object CalciteConfig {
+
+  val DEFAULT = createBuilder().build()
+
+  /**
+    * Creates a new builder for constructing a [[CalciteConfig]].
+    */
+  def createBuilder(): CalciteConfigBuilder = {
+    new CalciteConfigBuilder
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 15e3960..ac21834 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -24,8 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.Programs
-
+import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
@@ -204,6 +203,11 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Returns the built-in rules that are defined by the environment.
+    */
+  protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
+
+  /**
     * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
     * @param relNode The root node of the relational expression tree.
@@ -214,7 +218,7 @@ abstract class StreamTableEnvironment(
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
 
     // optimize the logical Flink plan
-    val optProgram = Programs.ofRules(FlinkRuleSets.DATASTREAM_OPT_RULES)
+    val optProgram = Programs.ofRules(getRuleSet)
     val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify()
 
     val dataStreamPlan = try {

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
index c92451d..37d9cb5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
@@ -22,7 +22,7 @@ import java.util.TimeZone
 /**
  * A config to define the runtime behavior of the Table API.
  */
-class TableConfig extends Serializable {
+class TableConfig {
 
   /**
    * Defines the timezone for date/time/timestamp conversions.
@@ -41,6 +41,11 @@ class TableConfig extends Serializable {
   private var efficientTypeUsage = false
 
   /**
+    * Defines the configuration of Calcite for Table API and SQL queries.
+    */
+  private var calciteConfig = CalciteConfig.DEFAULT
+
+  /**
    * Sets the timezone for date/time/timestamp conversions.
    */
   def setTimeZone(timeZone: TimeZone): Unit = {
@@ -83,6 +88,18 @@ class TableConfig extends Serializable {
     this.efficientTypeUsage = efficientTypeUsage
   }
 
+  /**
+    * Returns the current configuration of Calcite for Table API and SQL queries.
+    */
+  def getCalciteConfig: CalciteConfig = calciteConfig
+
+  /**
+    * Sets the configuration of Calcite for Table API and SQL queries.
+    * Changing the configuration has no effect after the first query has been defined.
+    */
+  def setCalciteConfig(calciteConfig: CalciteConfig): Unit = {
+    this.calciteConfig = calciteConfig
+  }
 }
 
 object TableConfig {

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index c3b728b..df97d2d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -29,7 +29,8 @@ import org.apache.calcite.schema.{Schemas, SchemaPlus}
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.tools.{FrameworkConfig, Frameworks}
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment
=> JavaStreamTableEnv}
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
@@ -46,6 +47,8 @@ import org.apache.flink.api.table.validate.FunctionCatalog
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
 
+import scala.collection.JavaConverters._
+
 /**
   * The abstract base class for batch and stream TableEnvironments.
   *
@@ -53,42 +56,31 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment
=> Scala
   */
 abstract class TableEnvironment(val config: TableConfig) {
 
-  // configure sql parser
-  // we use Java lex because back ticks are easier than double quotes in programming
-  // and cases are preserved
-  private val parserConfig = SqlParser
-    .configBuilder()
-    .setLex(Lex.JAVA)
-    .build()
-
   // the catalog to hold all registered and translated tables
   private val tables: SchemaPlus = Frameworks.createRootSchema(true)
 
   // Table API/SQL function catalog
   private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
 
-  // SQL operator and function catalog
-  private val sqlOperatorTable: SqlOperatorTable = functionCatalog.getSqlOperatorTable
-
   // the configuration to create a Calcite planner
-  private val frameworkConfig: FrameworkConfig = Frameworks
+  private lazy val frameworkConfig: FrameworkConfig = Frameworks
     .newConfigBuilder
     .defaultSchema(tables)
-    .parserConfig(parserConfig)
+    .parserConfig(getSqlParserConfig)
     .costFactory(new DataSetCostFactory)
     .typeSystem(new FlinkTypeSystem)
-    .operatorTable(sqlOperatorTable)
+    .operatorTable(getSqlOperatorTable)
     // set the executor to evaluate constant expressions
     .executor(new RexExecutorImpl(Schemas.createDataContext(null)))
     .build
 
   // the builder for Calcite RelNodes, Calcite's representation of a relational expression
tree.
-  protected val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)
+  protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)
 
   // the planner instance used to optimize queries of this TableEnvironment
-  private val planner: RelOptPlanner = relBuilder.getPlanner
+  private lazy val planner: RelOptPlanner = relBuilder.getPlanner
 
-  private val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
+  private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
 
   // a counter for unique attribute names
   private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
@@ -97,6 +89,69 @@ abstract class TableEnvironment(val config: TableConfig) {
   def getConfig = config
 
   /**
+    * Returns the operator table for this environment including a custom Calcite configuration.
+    */
+  protected def getSqlOperatorTable: SqlOperatorTable = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getSqlOperatorTable match {
+
+      case None =>
+        functionCatalog.getSqlOperatorTable
+
+      case Some(table) =>
+        if (calciteConfig.replacesSqlOperatorTable) {
+          table
+        } else {
+          ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table)
+        }
+    }
+  }
+
+  /**
+    * Returns the rule set for this environment including a custom Calcite configuration.
+    */
+  protected def getRuleSet: RuleSet = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getRuleSet match {
+
+      case None =>
+        getBuiltInRuleSet
+
+      case Some(ruleSet) =>
+        if (calciteConfig.replacesRuleSet) {
+          ruleSet
+        } else {
+          RuleSets.ofList((getBuiltInRuleSet.asScala ++ ruleSet.asScala).asJava)
+        }
+    }
+  }
+
+  /**
+    * Returns the SQL parser config for this environment including a custom Calcite configuration.
+    */
+  protected def getSqlParserConfig: SqlParser.Config = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getSqlParserConfig match {
+
+      case None =>
+        // we use Java lex because back ticks are easier than double quotes in programming
+        // and cases are preserved
+        SqlParser
+          .configBuilder()
+          .setLex(Lex.JAVA)
+          .build()
+
+      case Some(sqlParserConfig) =>
+        sqlParserConfig
+    }
+  }
+
+  /**
+    * Returns the built-in rules that are defined by the environment.
+    */
+  protected def getBuiltInRuleSet: RuleSet
+
+  /**
     * Registers a [[UserDefinedFunction]] under a unique name. Replaces already existing
     * user-defined functions under this name.
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
index 5e40724..2d662d6 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.calcite.tools.RuleSets;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
@@ -33,6 +34,8 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.CalciteConfig;
+import org.apache.flink.api.table.CalciteConfigBuilder;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
@@ -436,6 +439,19 @@ public class TableEnvironmentITCase extends TableProgramsTestBase {
 		tableEnv.toDataSet(t, MyNonStatic.class);
 	}
 
+	@Test(expected = TableException.class)
+	public void testCustomCalciteConfig() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		CalciteConfig cc = new CalciteConfigBuilder().replaceRuleSet(RuleSets.ofList()).build();
+		tableEnv.getConfig().setCalciteConfig(cc);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		tableEnv.toDataSet(t, Row.class);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public class MyNonStatic {

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
new file mode 100644
index 0000000..2b0d446
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.calcite.rel.rules.{CalcSplitRule, CalcMergeRule, FilterMergeRule}
+import org.apache.calcite.sql.fun.{SqlStdOperatorTable, OracleSqlOperatorTable}
+import org.apache.calcite.tools.RuleSets
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+class CalciteConfigBuilderTest {
+
+  @Test
+  def testDefaultRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .build()
+
+    assertEquals(false, cc.replacesRuleSet)
+    assertFalse(cc.getRuleSet.isDefined)
+  }
+
+  @Test
+  def testReplaceRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .build()
+
+    assertEquals(true, cc.replacesRuleSet)
+    assertTrue(cc.getRuleSet.isDefined)
+    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+    assertEquals(1, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+  }
+
+  @Test
+  def testReplaceAddRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
+      .build()
+
+    assertEquals(true, cc.replacesRuleSet)
+    assertTrue(cc.getRuleSet.isDefined)
+    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+    assertEquals(3, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
+  }
+
+  @Test
+  def testAddRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .build()
+
+    assertEquals(false, cc.replacesRuleSet)
+    assertTrue(cc.getRuleSet.isDefined)
+    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+    assertEquals(1, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+  }
+
+  @Test
+  def testAddAddRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
+      .build()
+
+    assertEquals(false, cc.replacesRuleSet)
+    assertTrue(cc.getRuleSet.isDefined)
+    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+    assertEquals(3, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
+  }
+
+  @Test
+  def testDefaultOperatorTable(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .build()
+
+    assertEquals(false, cc.replacesSqlOperatorTable)
+    assertFalse(cc.getSqlOperatorTable.isDefined)
+  }
+
+  def testReplaceOperatorTable(): Unit = {
+
+    val oracleTable = new OracleSqlOperatorTable
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceSqlOperatorTable(oracleTable)
+      .build()
+
+    val oracleOps = oracleTable.getOperatorList.asScala
+
+    assertEquals(true, cc.replacesSqlOperatorTable)
+    assertTrue(cc.getSqlOperatorTable.isDefined)
+    val ops = cc.getSqlOperatorTable.get.getOperatorList
+      .asScala.toSet
+    assertEquals(oracleOps.size, ops.size)
+    for (o <- oracleOps) {
+      assertTrue(ops.contains(o))
+    }
+  }
+
+  def testReplaceAddOperatorTable(): Unit = {
+
+    val oracleTable = new OracleSqlOperatorTable
+    val stdTable = new SqlStdOperatorTable
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceSqlOperatorTable(oracleTable)
+      .addSqlOperatorTable(stdTable)
+      .build()
+
+    val oracleOps = oracleTable.getOperatorList.asScala
+    val stdOps = stdTable.getOperatorList.asScala
+
+    assertEquals(true, cc.replacesSqlOperatorTable)
+    assertTrue(cc.getSqlOperatorTable.isDefined)
+    val ops = cc.getSqlOperatorTable.get.getOperatorList
+      .asScala.toSet
+    assertEquals(oracleOps.size + stdOps.size, ops.size)
+    for (o <- oracleOps) {
+      assertTrue(ops.contains(o))
+    }
+    for (o <- stdOps) {
+      assertTrue(ops.contains(o))
+    }
+
+  }
+
+  def testAddOperatorTable(): Unit = {
+
+    val oracleTable = new OracleSqlOperatorTable
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addSqlOperatorTable(oracleTable)
+      .build()
+
+    val oracleOps = oracleTable.getOperatorList.asScala
+
+    assertEquals(false, cc.replacesSqlOperatorTable)
+    assertTrue(cc.getSqlOperatorTable.isDefined)
+    val ops = cc.getSqlOperatorTable.get.getOperatorList
+      .asScala.toSet
+    assertEquals(oracleOps.size, ops.size)
+    for (o <- oracleOps) {
+      assertTrue(ops.contains(o))
+    }
+  }
+
+  def testAddAddOperatorTable(): Unit = {
+
+    val oracleTable = new OracleSqlOperatorTable
+    val stdTable = new SqlStdOperatorTable
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addSqlOperatorTable(oracleTable)
+      .addSqlOperatorTable(stdTable)
+      .build()
+
+    val oracleOps = oracleTable.getOperatorList.asScala
+    val stdOps = stdTable.getOperatorList.asScala
+
+    assertEquals(false, cc.replacesSqlOperatorTable)
+    assertTrue(cc.getSqlOperatorTable.isDefined)
+    val ops = cc.getSqlOperatorTable.get.getOperatorList
+      .asScala.toSet
+    assertEquals(oracleOps.size + stdOps.size, ops.size)
+    for (o <- oracleOps) {
+      assertTrue(ops.contains(o))
+    }
+    for (o <- stdOps) {
+      assertTrue(ops.contains(o))
+    }
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc0b3430/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
index 263696b..db86ef3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
@@ -18,10 +18,11 @@
 
 package org.apache.flink.api.table
 
+import org.apache.calcite.tools.RuleSet
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.table.expressions.{Alias, UnresolvedFieldReference}
 import org.apache.flink.api.table.sinks.TableSink
 import org.junit.Test
@@ -279,6 +280,8 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
 
   override protected def checkValidTableName(name: String): Unit = ???
 
+  override protected def getBuiltInRuleSet: RuleSet = ???
+
   override def sql(query: String): Table = ???
 }
 


Mime
View raw message