flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/4] flink git commit: [hotfix] [tableAPI] Fix SQL queries on TableSources.
Date Tue, 10 May 2016 20:31:15 GMT
[hotfix] [tableAPI] Fix SQL queries on TableSources.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ed07933
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ed07933
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ed07933

Branch: refs/heads/master
Commit: 7ed07933d2dd3cf41948287dc8fd79dbef902311
Parents: 15f5211
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed May 4 19:03:29 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue May 10 18:58:31 2016 +0200

----------------------------------------------------------------------
 .../api/table/StreamTableEnvironment.scala      |  7 +--
 .../datastream/StreamTableSourceScan.scala      |  4 +-
 .../api/table/plan/rules/FlinkRuleSets.scala    |  8 +--
 .../plan/rules/LogicalScanToStreamable.scala    | 56 --------------------
 .../plan/rules/datastream/RemoveDeltaRule.scala | 42 +++++++++++++++
 .../datastream/StreamTableSourceScanRule.scala  |  9 ++--
 .../schema/StreamableTableSourceTable.scala     | 30 +++++++++++
 .../table/plan/schema/TableSourceTable.scala    |  4 +-
 .../table/plan/schema/TransStreamTable.scala    |  5 --
 .../api/scala/stream/TableSourceITCase.scala    |  2 +-
 10 files changed, 87 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 918a65f..be1c005 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -29,8 +29,9 @@ import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, DataStreamConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.{TableSourceTable, TransStreamTable, DataStreamTable}
 import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
+import org.apache.flink.api.table.plan.schema.
+  {StreamableTableSourceTable, TransStreamTable, DataStreamTable}
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -110,7 +111,7 @@ abstract class StreamTableEnvironment(
   def registerTableSource(name: String, tableSource: StreamTableSource[_]): Unit = {
 
     checkValidTableName(name)
-    registerTableInternal(name, new TableSourceTable(tableSource))
+    registerTableInternal(name, new StreamableTableSourceTable(tableSource))
   }
 
   /**
@@ -179,7 +180,7 @@ abstract class StreamTableEnvironment(
       fieldIndexes,
       fieldNames
     )
-    // when registering a DataStream, we need to wrap it into a StreamableTable
+    // when registering a DataStream, we need to wrap it into a TransStreamTable
     // so that the SQL validation phase won't fail
     if (wrapper) {
       registerTableInternal(name, dataStreamTable)

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 21b8a63..2c7a584 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.plan.schema.StreamableTableSourceTable
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 
@@ -35,7 +35,7 @@ class StreamTableSourceScan(
     rowType: RelDataType)
   extends StreamScan(cluster, traitSet, table, rowType) {
 
-  val tableSourceTable = table.unwrap(classOf[TableSourceTable])
+  val tableSourceTable = table.unwrap(classOf[StreamableTableSourceTable])
   val tableSource = tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]]
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 5d5912b..4ce8e5f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -110,12 +110,10 @@ object FlinkRuleSets {
   /**
   * RuleSet to optimize plans for batch / DataSet execution
   */
-  val DATASTREAM_OPT_RULES: RuleSet = {
-
-    val rules = List(
+  val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
 
+      RemoveDeltaRule.INSTANCE,
       EnumerableToLogicalTableScan.INSTANCE,
-      LogicalScanToStreamable.INSTANCE,
 
       // calc rules
       FilterToCalcRule.INSTANCE,
@@ -148,6 +146,4 @@ object FlinkRuleSets {
       StreamTableSourceScanRule.INSTANCE
   )
 
-    RuleSets.ofList(rules ++ StreamRules.RULES.asList.take(7))
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
deleted file mode 100644
index 3b389bc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/LogicalScanToStreamable.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules
-
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
-import org.apache.calcite.prepare.RelOptTableImpl
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.calcite.schema.StreamableTable
-import org.apache.flink.api.table.plan.schema.TransStreamTable
-
-/**
-  * Custom rule that converts a LogicalScan into another LogicalScan
-  * whose internal Table is [[StreamableTable]] and [[org.apache.calcite.schema.TranslatableTable]].
-  */
-class LogicalScanToStreamable(
-    operand: RelOptRuleOperand,
-    description: String) extends RelOptRule(operand, description) {
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val oldRel = call.rel(0).asInstanceOf[LogicalTableScan]
-    val table = oldRel.getTable
-    table.unwrap(classOf[StreamableTable]) match {
-      case s: StreamableTable =>
-        // already a StreamableTable => do nothing
-      case _ => // convert to a StreamableTable
-        val sTable = new TransStreamTable(oldRel, false)
-        val newRel = LogicalTableScan.create(oldRel.getCluster,
-          RelOptTableImpl.create(table.getRelOptSchema, table.getRowType, sTable))
-        call.transformTo(newRel)
-    }
-  }
-}
-
-object LogicalScanToStreamable {
-  val INSTANCE = new LogicalScanToStreamable(
-    operand(classOf[LogicalTableScan], any),
-    "LogicalScanToStreamable")
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala
new file mode 100644
index 0000000..7b4720a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.stream.LogicalDelta
+
+/**
+ * Rule that converts an EnumerableTableScan into a LogicalTableScan.
+ * We need this rule because Calcite creates an EnumerableTableScan
+ * when parsing a SQL query. We convert it into a LogicalTableScan
+ * so we can merge the optimization process with any plan that might be created
+ * by the Table API.
+ */
+class RemoveDeltaRule extends RelOptRule(operand(classOf[LogicalDelta], any), "RemoveDeltaRule")
{
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val delta = call.rel(0).asInstanceOf[LogicalDelta]
+    call.transformTo(delta.getInput)
+  }
+}
+
+object RemoveDeltaRule {
+  val INSTANCE = new RemoveDeltaRule()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index 5ecf994..8000cde 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -23,8 +23,9 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.datastream.{StreamTableSourceScan, DataStreamConvention}
-import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.plan.nodes.datastream.
+  {StreamTableSourceScan, DataStreamConvention}
+import org.apache.flink.api.table.plan.schema.StreamableTableSourceTable
 import org.apache.flink.api.table.sources.StreamTableSource
 
 /** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */
@@ -39,9 +40,9 @@ class StreamTableSourceScanRule
   /** Rule must only match if TableScan targets a [[StreamTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
-    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    val dataSetTable = scan.getTable.unwrap(classOf[StreamableTableSourceTable])
     dataSetTable match {
-      case tst: TableSourceTable =>
+      case tst: StreamableTableSourceTable =>
         tst.tableSource match {
           case _: StreamTableSource[_] =>
             true

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala
new file mode 100644
index 0000000..58214bc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import org.apache.calcite.schema.{Table, StreamableTable}
+import org.apache.flink.api.table.sources.TableSource
+
+/** Table which defines an external streamable table via a [[TableSource]] */
+class StreamableTableSourceTable(tableSource: TableSource[_])
+  extends TableSourceTable(tableSource)
+  with StreamableTable {
+
+  override def stream(): Table = new TableSourceTable(tableSource)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
index 042c823..03646f9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
@@ -27,6 +27,4 @@ class TableSourceTable(val tableSource: TableSource[_])
   extends FlinkTable[Row](
     typeInfo = new RowTypeInfo(tableSource.getFieldTypes, tableSource.getFieldsNames),
     fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
-    fieldNames = tableSource.getFieldsNames.toArray) {
-
-}
+    fieldNames = tableSource.getFieldsNames)

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
index bc27659..61f2598 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
@@ -33,11 +33,6 @@ import org.apache.calcite.schema.{StreamableTable, Table, TranslatableTable}
   * can be converted to a relational expression and [[StreamableTable]]
   * so that it can be used in Streaming SQL queries.
   *
-  * Except for registering Streaming Tables, this implementation is also used
-  * in [[org.apache.flink.api.table.plan.rules.LogicalScanToStreamable]]
-  * rule to convert a logical scan of a non-Streamable Table into
-  * a logical scan of a Streamable table, i.e. of this class.
-  *
   * @see [[DataStreamTable]]
   */
 class TransStreamTable(relNode: RelNode, wrapper: Boolean)

http://git-wip-us.apache.org/repos/asf/flink/blob/7ed07933/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
index 687cc28..50d9a42 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
@@ -70,7 +70,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
 
     tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
     tEnv.sql(
-      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
+      "SELECT STREAM amount * id, name FROM MyTestTable WHERE amount < 4")
       .toDataStream[Row]
       .addSink(new StreamITCase.StringSink)
 


Mime
View raw message