flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [2/2] flink git commit: [FLINK-3547] add support for DataStreamTable
Date Tue, 29 Mar 2016 11:04:02 GMT
[FLINK-3547] add support for DataStreamTable

- add Java and Scala stream translators

- add DataStream rules for calc and scan

- add tests and streaming test utils

- add support for streaming union

- move code generation from the calc rule to the calc node
and remove unnecessary rules

- refactoring to reuse code between dataset and datastream translation

This closes #1820


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f773fe05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f773fe05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f773fe05

Branch: refs/heads/master
Commit: f773fe05d1663ffaa92c2328c07e68a645d8b553
Parents: 4405235
Author: vasia <vasia@apache.org>
Authored: Mon Feb 29 15:18:23 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Tue Mar 29 13:03:25 2016 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |   8 +
 .../api/java/table/JavaStreamTranslator.scala   |  99 +++++++++++
 .../flink/api/java/table/TableEnvironment.scala |  53 +++++-
 .../api/scala/table/DataStreamConversions.scala |  67 ++++++++
 .../api/scala/table/ScalaStreamTranslator.scala |  51 ++++++
 .../api/scala/table/TableConversions.scala      |  16 +-
 .../api/scala/table/TableEnvironment.scala      |  36 ++++
 .../apache/flink/api/scala/table/package.scala  |  19 +-
 .../api/table/plan/TranslationContext.scala     |  14 +-
 .../flink/api/table/plan/nodes/FlinkCalc.scala  | 172 +++++++++++++++++++
 .../flink/api/table/plan/nodes/FlinkRel.scala   |  47 +++++
 .../table/plan/nodes/dataset/DataSetCalc.scala  | 130 ++------------
 .../table/plan/nodes/dataset/DataSetRel.scala   |  27 +--
 .../plan/nodes/datastream/DataStreamCalc.scala  | 101 +++++++++++
 .../nodes/datastream/DataStreamConvention.scala |  42 +++++
 .../plan/nodes/datastream/DataStreamRel.scala   |  46 +++++
 .../nodes/datastream/DataStreamSource.scala     | 123 +++++++++++++
 .../plan/nodes/datastream/DataStreamUnion.scala |  74 ++++++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |  39 ++++-
 .../rules/datastream/DataStreamCalcRule.scala   |  53 ++++++
 .../rules/datastream/DataStreamScanRule.scala   |  51 ++++++
 .../rules/datastream/DataStreamUnionRule.scala  |  53 ++++++
 .../api/table/plan/schema/DataSetTable.scala    |  49 +-----
 .../api/table/plan/schema/DataStreamTable.scala |  37 ++++
 .../api/table/plan/schema/FlinkTable.scala      |  72 ++++++++
 .../table/streaming/test/FilterITCase.scala     | 135 +++++++++++++++
 .../table/streaming/test/SelectITCase.scala     | 168 ++++++++++++++++++
 .../table/streaming/test/UnionITCase.scala      | 104 +++++++++++
 .../streaming/test/utils/StreamITCase.scala     |  41 +++++
 .../streaming/test/utils/StreamTestData.scala   |  87 ++++++++++
 .../api/scala/table/test/SelectITCase.scala     |   4 +-
 31 files changed, 1823 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index a9a9238..088df98 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -54,6 +54,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-examples-batch_2.10</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamTranslator.scala
new file mode 100644
index 0000000..e4b8ca0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamTranslator.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table
+
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
+import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
+import org.apache.calcite.rel.{RelCollations, RelNode}
+import org.apache.calcite.sql2rel.RelDecorrelator
+import org.apache.calcite.tools.Programs
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaDataStream}
+import org.apache.flink.api.table.plan._
+import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, DataStreamConvention}
+import org.apache.flink.api.table.plan.rules.FlinkRuleSets
+import org.apache.flink.api.table.plan.schema.DataStreamTable
+
+/**
+ * [[PlanTranslator]] for creating [[Table]]s
+ * from Java [[org.apache.flink.streaming.api.datastream.DataStream]]s and
+ * translating them back to Java [[org.apache.flink.streaming.api.datastream.DataStream]]s.
+ */
+class JavaStreamTranslator(config: TableConfig) extends PlanTranslator {
+
+  type Representation[A] = JavaDataStream[A]
+
+  override def createTable[A](
+      repr: Representation[A],
+      fieldIndexes: Array[Int],
+      fieldNames: Array[String]): Table = {
+
+    // create table representation from DataSet
+    val dataStreamTable = new DataStreamTable[A](
+      repr.asInstanceOf[JavaDataStream[A]],
+      fieldIndexes,
+      fieldNames
+    )
+
+    val tabName = TranslationContext.addDataStream(dataStreamTable)
+    val relBuilder = TranslationContext.getRelBuilder
+
+    // create table scan operator
+    relBuilder.scan(tabName)
+    new Table(relBuilder.build(), relBuilder)
+  }
+
+  override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataStream[A] = {
+
+    // get the planner for the plan
+    val planner = lPlan.getCluster.getPlanner
+
+    // decorrelate
+    val decorPlan = RelDecorrelator.decorrelateQuery(lPlan)
+
+    // optimize the logical Flink plan
+    val optProgram = Programs.ofRules(FlinkRuleSets.DATASTREAM_OPT_RULES)
+    val flinkOutputProps = RelTraitSet.createEmpty()
+      .plus(DataStreamConvention.INSTANCE)
+      .plus(RelCollations.of()).simplify()
+
+    val dataStreamPlan = try {
+      optProgram.run(planner, decorPlan, flinkOutputProps)
+    }
+    catch {
+      case e: CannotPlanException =>
+        throw new PlanGenException(
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(lPlan)}\n" +
+            "Please consider filing a bug report.", e)
+    }
+
+    dataStreamPlan match {
+      case node: DataStreamRel =>
+        node.translateToPlan(
+          config,
+          Some(tpe.asInstanceOf[TypeInformation[Any]])
+        ).asInstanceOf[JavaDataStream[A]]
+      case _ => ???
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
index e0d88a3..90954c6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
@@ -22,11 +22,12 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.table.expressions.ExpressionParser
 import org.apache.flink.api.table.{AbstractTableEnvironment, Table}
+import org.apache.flink.streaming.api.datastream.DataStream
 
 /**
  * Environment for working with the Table API.
  *
- * This can be used to convert a [[DataSet]] to a [[Table]] and back again. You
+ * This can be used to convert a [[DataSet]] or a [[DataStream]] to a [[Table]] and back again. You
  * can also use the provided methods to create a [[Table]] directly from a data source.
  */
 class TableEnvironment extends AbstractTableEnvironment {
@@ -106,4 +107,54 @@ class TableEnvironment extends AbstractTableEnvironment {
     registerDataSetInternal(name, dataset, exprs)
   }
 
+    /**
+   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataStream type are renamed to the given set of fields:
+   *
+   * Example:
+   *
+   * {{{
+   *   tableEnv.fromDataStream(stream, "a, b")
+   * }}}
+   *
+   * This will transform the stream containing elements of two fields to a table where the fields
+   * are named a and b.
+   */
+  def fromDataStream[T](set: DataStream[T], fields: String): Table = {
+    new JavaStreamTranslator(config).createTable(set, fields)
+  }
+
+  /**
+   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataStream type are used to name the
+   * [[org.apache.flink.api.table.Table]] fields.
+   */
+  def fromDataStream[T](set: DataStream[T]): Table = {
+    new JavaStreamTranslator(config).createTable(set)
+  }
+
+    /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataStream. The given type must have exactly the same field types and field order as the
+   * [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
+   * POJO types require name equivalence to be mapped correctly as their fields do not have
+   * an order.
+   */
+  @SuppressWarnings(Array("unchecked"))
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+    new JavaStreamTranslator(config).translate[T](table.relNode)(
+      TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
+  }
+
+  /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataStream. The given type must have exactly the same field types and field order as the
+   * [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
+   * POJO types require name equivalence to be mapped correctly as their fields do not have
+   * an order.
+   */
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+    new JavaStreamTranslator(config).translate[T](table.relNode)(typeInfo)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
new file mode 100644
index 0000000..cd663b2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+ * Methods for converting a [[DataStream]] to a [[Table]]. A [[DataStream]] is
+ * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]].
+ */
+class DataStreamConversions[T](set: DataStream[T], inputType: CompositeType[T]) {
+
+  /**
+   * Converts the [[DataStream]] to a [[Table]]. The field names can be specified like this:
+   *
+   * {{{
+   *   val in: DataStream[(String, Int)] = ...
+   *   val table = in.asStream('a, 'b)
+   * }}}
+   *
+   * This results in a [[Table]] that has field `a` of type `String` and field `b`
+   * of type `Int`.
+   */
+  def as(fields: Expression*): Table = {
+     new ScalaStreamTranslator().createTable(set, fields.toArray)
+  }
+
+  /**
+   * Converts the [[DataStream]] to a [[Table]]. The field names will be taken from the field names
+   * of the input type.
+   *
+   * Example:
+   *
+   * {{{
+   *   val in: DataStream[(String, Int)] = ...
+   *   val table = in.toStreamTable
+   * }}}
+   *
+   * Here, the result is a [[Table]] that has field `_1` of type `String` and field `_2`
+   * of type `Int`.
+   */
+  def toStreamTable: Table = {
+    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
+    as(resultFields: _*)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamTranslator.scala
new file mode 100644
index 0000000..be3664f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamTranslator.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.java.table.JavaStreamTranslator
+import org.apache.flink.api.table.plan._
+import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.asScalaStream
+
+/**
+ * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
+ * translating them back to Scala [[DataStream]]s.
+ */
+class ScalaStreamTranslator(config: TableConfig = TableConfig.DEFAULT) extends PlanTranslator {
+
+  private val javaTranslator = new JavaStreamTranslator(config)
+
+  type Representation[A] = DataStream[A]
+
+  override def createTable[A](
+    repr: Representation[A],
+    fieldIndexes: Array[Int],
+    fieldNames: Array[String]): Table =
+  {
+    javaTranslator.createTable(repr.javaStream, fieldIndexes, fieldNames)
+  }
+
+  override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): DataStream[O] = {
+    asScalaStream(javaTranslator.translate(op))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
index 74c8ee8..f755b17 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
@@ -21,9 +21,10 @@ package org.apache.flink.api.scala.table
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.api.table._
+import org.apache.flink.streaming.api.scala.DataStream
 
 /**
- * Methods for converting a [[Table]] to a [[DataSet]]. A [[Table]] is
+ * Methods for converting a [[Table]] to a [[DataSet]] or a [[DataStream]]. A [[Table]] is
  * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]].
  */
 class TableConversions(table: Table) {
@@ -42,5 +43,18 @@ class TableConversions(table: Table) {
      new ScalaBatchTranslator(config).translate[T](table.relNode)
   }
 
+  /**
+   * Converts the [[Table]] to a [[DataStream]] using the default configuration.
+   */
+  def toDataStream[T: TypeInformation]: DataStream[T] = {
+     new ScalaStreamTranslator().translate[T](table.relNode)
+  }
+
+  /**
+   * Converts the [[Table]] to a [[DataStream]] using a custom configuration.
+   */
+  def toDataStream[T: TypeInformation](config: TableConfig): DataStream[T] = {
+     new ScalaStreamTranslator(config).translate[T](table.relNode)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
index 9f71c63..46164be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.DataSet
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.{AbstractTableEnvironment, Table}
+import org.apache.flink.streaming.api.scala.DataStream
 
 /**
  * Environment for working with the Table API.
@@ -87,4 +88,39 @@ class TableEnvironment extends AbstractTableEnvironment {
     registerDataSetInternal(name, dataset.javaSet, fields.toArray)
   }
 
+    /**
+   * Converts the [[DataStream]] to a [[Table]]. The field names can be specified like this:
+   *
+   * {{{
+   *   val in: DataStream[(String, Int)] = ...
+   *   val table = in.as('a, 'b)
+   * }}}
+   *
+   * This results in a [[Table]] that has field `a` of type `String` and field `b`
+   * of type `Int`.
+   */
+  def fromDataStream[T](set: DataStream[T], fields: Expression*): Table = {
+    new ScalaStreamTranslator(config).createTable(set, fields.toArray)
+  }
+
+  /**
+   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataSet type are used to name the
+   * [[org.apache.flink.api.table.Table]] fields.
+   */
+  def fromDataStream[T](set: DataStream[T]): Table = {
+    new ScalaStreamTranslator(config).createTable(set)
+  }
+
+  /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataStream. The given type must have exactly the same field types and field order as the
+   * [[org.apache.flink.api.table.Table]]. Row and tuple types can be mapped by position.
+   * POJO types require name equivalence to be mapped correctly as their fields do not have
+   * an order.
+   */
+  def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
+     new ScalaStreamTranslator(config).translate[T](table.relNode)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index 5c3ba44..7812d7f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.table.{Row, Table}
-
 import scala.language.implicitConversions
+import org.apache.flink.streaming.api.scala.DataStream
 
 /**
  * == Table API (Scala) ==
@@ -31,14 +31,14 @@ import scala.language.implicitConversions
  *   import org.apache.flink.api.scala.table._
  * }}}
  *
- * imports implicit conversions for converting a [[DataSet]] to a
+ * imports implicit conversions for converting a [[DataSet]] and a [[DataStream]] to a
  * [[Table]]. This can be used to perform SQL-like queries on data. Please have
  * a look at [[Table]] to see which operations are supported and
  * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see how an
  * expression can be specified.
  *
  * When writing a query you can use Scala Symbols to refer to field names. One would
- * refer to field `a` by writing `'a`. Sometimes it is necessary to manually confert a
+ * refer to field `a` by writing `'a`. Sometimes it is necessary to manually convert a
  * Scala literal to an Expression Literal, in those cases use `Literal`, as in `Literal(3)`.
  *
  * Example:
@@ -85,4 +85,17 @@ package object table extends ImplicitExpressionConversions {
     rowDataSet.toTable
   }
 
+  implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = {
+    new DataStreamConversions[T](set, set.getType.asInstanceOf[CompositeType[T]])
+  }
+
+  implicit def table2RowDataStream(
+      table: Table): DataStream[Row] = {
+    new ScalaStreamTranslator().translate[Row](table.relNode)
+  }
+
+  implicit def rowDataStream2Table(
+      rowDataStream: DataStream[Row]): Table = {
+    rowDataStream.toStreamTable
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index 330fe6b..cd5e2b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.api.table.plan
 
 import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.config.Lex
 import org.apache.calcite.plan.ConventionTraitDef
 import org.apache.calcite.schema.impl.AbstractTable
@@ -32,7 +31,8 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.TableException
 import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.{TableTable, DataSetTable}
+import org.apache.flink.api.table.plan.schema.DataSetTable
+import org.apache.flink.api.table.plan.schema.DataStreamTable
 
 object TranslationContext {
 
@@ -115,6 +115,16 @@ object TranslationContext {
     }
   }
 
+  /**
+   * Adds a stream Table to the tables registry so it can be used by
+   * the streaming Table API.
+   */
+  def addDataStream(newTable: DataStreamTable[_]): String = {
+    val tabName = "_DataStreamTable_" + nameCntr.getAndIncrement()
+    tables.add(tabName, newTable)
+    tabName
+  }
+
   def getUniqueName: String = {
     "TMP_" + nameCntr.getAndIncrement()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
new file mode 100644
index 0000000..aa5492f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.{GeneratedFunction, CodeGenerator}
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+trait FlinkCalc {
+
+  private[flink] def functionBody(
+    generator: CodeGenerator,
+    inputType: TypeInformation[Any],
+    rowType: RelDataType,
+    calcProgram: RexProgram,
+    config: TableConfig,
+    expectedType: Option[TypeInformation[Any]]): String = {
+
+    val returnType = determineReturnType(
+      rowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val condition = calcProgram.getCondition
+    val expandedExpressions = calcProgram.getProjectList.map(
+      expr => calcProgram.expandLocalRef(expr))
+    val projection = generator.generateResultExpression(
+      returnType,
+      rowType.getFieldNames,
+      expandedExpressions)
+
+      // only projection
+      if (condition == null) {
+        s"""
+          |${projection.code}
+          |${generator.collectorTerm}.collect(${projection.resultTerm});
+          |""".stripMargin
+      }
+      else {
+        val filterCondition = generator.generateExpression(
+          calcProgram.expandLocalRef(calcProgram.getCondition))
+        // only filter
+        if (projection == null) {
+          // conversion
+          if (inputType != returnType) {
+            val conversion = generator.generateConverterResultExpression(
+              returnType,
+              rowType.getFieldNames)
+
+            s"""
+              |${filterCondition.code}
+              |if (${filterCondition.resultTerm}) {
+              |  ${conversion.code}
+              |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+              |}
+              |""".stripMargin
+          }
+          // no conversion
+          else {
+            s"""
+              |${filterCondition.code}
+              |if (${filterCondition.resultTerm}) {
+              |  ${generator.collectorTerm}.collect(${generator.input1Term});
+              |}
+              |""".stripMargin
+          }
+        }
+        // both filter and projection
+        else {
+          s"""
+            |${filterCondition.code}
+            |if (${filterCondition.resultTerm}) {
+            |  ${projection.code}
+            |  ${generator.collectorTerm}.collect(${projection.resultTerm});
+            |}
+            |""".stripMargin
+        }
+      }
+    }
+
+  private[flink] def calcMapFunction(
+      genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): RichFlatMapFunction[Any, Any] = {
+
+    new FlatMapRunner[Any, Any](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+  }
+
+  private[flink] def conditionToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+    val cond = calcProgram.getCondition
+    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val localExprs = calcProgram.getExprList.asScala.toList
+
+    if (cond != null) {
+      expression(cond, inFields, Some(localExprs))
+    } else {
+      ""
+    }
+  }
+
+  private[flink] def selectionToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+    val proj = calcProgram.getProjectList.asScala.toList
+    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val localExprs = calcProgram.getExprList.asScala.toList
+    val outFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+
+    proj
+      .map(expression(_, inFields, Some(localExprs)))
+      .zip(outFields).map { case (e, o) => {
+      if (e != o) {
+        e + " AS " + o
+      } else {
+        e
+      }
+    }
+    }.mkString(", ")
+  }
+
+  private[flink] def calcOpName(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+    val conditionStr = conditionToString(calcProgram, expression)
+    val selectionStr = selectionToString(calcProgram, expression)
+
+    s"${if (calcProgram.getCondition != null) {
+      s"where: ($conditionStr), "
+    } else {
+      ""
+    }}select: ($selectionStr)"
+  }
+
+  private[flink] def calcToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+    val name = calcOpName(calcProgram, expression)
+    s"Calc($name)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
new file mode 100644
index 0000000..dad50a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rex._
+import scala.collection.JavaConversions._
+
+trait FlinkRel {
+
+  private[flink] def getExpressionString(
+    expr: RexNode,
+    inFields: List[String],
+    localExprsTable: Option[List[RexNode]]): String = {
+
+    expr match {
+      case i: RexInputRef => inFields.get(i.getIndex)
+      case l: RexLiteral => l.toString
+      case l: RexLocalRef if localExprsTable.isEmpty =>
+        throw new IllegalArgumentException("Encountered RexLocalRef without local expression table")
+      case l: RexLocalRef =>
+        val lExpr = localExprsTable.get(l.getIndex)
+        getExpressionString(lExpr, inFields, localExprsTable)
+      case c: RexCall => {
+        val op = c.getOperator.toString
+        val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
+        s"$op(${ops.mkString(", ")})"
+      }
+      case _ => throw new IllegalArgumentException("Unknown expression type: " + expr)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index 5fd3e86..67daffc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -26,15 +26,12 @@ import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.plan.nodes.FlinkCalc
 import org.apache.flink.api.table.typeutils.TypeConverter
 import TypeConverter._
-import org.apache.flink.api.table.runtime.FlatMapRunner
 import org.apache.flink.api.table.TableConfig
 import org.apache.calcite.rex._
 
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
 /**
   * Flink RelNode which matches along with LogicalCalc.
   *
@@ -47,6 +44,7 @@ class DataSetCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
+  with FlinkCalc
   with DataSetRel {
 
   override def deriveRowType() = rowType
@@ -61,18 +59,14 @@ class DataSetCalc(
       ruleDescription)
   }
 
-  override def toString: String = {
-    s"Calc(${if (calcProgram.getCondition != null) {
-      s"where: ($conditionToString), "
-    } else {
-      ""
-    }}select: ($selectionToString))"
-  }
+  override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _))
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .item("select", selectionToString)
-      .itemIf("where", conditionToString, calcProgram.getCondition != null)
+      .item("select", selectionToString(calcProgram, getExpressionString(_, _, _)))
+      .itemIf("where",
+        conditionToString(calcProgram, getExpressionString(_, _, _)),
+        calcProgram.getCondition != null)
   }
 
   override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
@@ -108,63 +102,13 @@ class DataSetCalc(
 
     val generator = new CodeGenerator(config, inputDS.getType)
 
-    val condition = calcProgram.getCondition
-    val expandedExpressions = calcProgram.getProjectList.map(
-       expr => calcProgram.expandLocalRef(expr))
-    val projection = generator.generateResultExpression(
-      returnType,
-      rowType.getFieldNames,
-      expandedExpressions)
-
-    val body = {
-      // only projection
-      if (condition == null) {
-        s"""
-          |${projection.code}
-          |${generator.collectorTerm}.collect(${projection.resultTerm});
-          |""".stripMargin
-      }
-      else {
-        val filterCondition = generator.generateExpression(
-          calcProgram.expandLocalRef(calcProgram.getCondition))
-        // only filter
-        if (projection == null) {
-          // conversion
-          if (inputDS.getType != returnType) {
-            val conversion = generator.generateConverterResultExpression(
-              returnType,
-              rowType.getFieldNames)
-
-            s"""
-              |${filterCondition.code}
-              |if (${filterCondition.resultTerm}) {
-              |  ${conversion.code}
-              |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-              |}
-              |""".stripMargin
-          }
-          // no conversion
-          else {
-            s"""
-              |${filterCondition.code}
-              |if (${filterCondition.resultTerm}) {
-              |  ${generator.collectorTerm}.collect(${generator.input1Term});
-              |}
-              |""".stripMargin
-          }
-        }
-        // both filter and projection
-        else {
-          s"""
-            |${filterCondition.code}
-            |if (${filterCondition.resultTerm}) {
-            |  ${projection.code}
-            |  ${generator.collectorTerm}.collect(${projection.resultTerm});
-            |}
-            |""".stripMargin
-        }
-      }
-    }
+    val body = functionBody(
+      generator,
+      inputDS.getType,
+      getRowType,
+      calcProgram,
+      config,
+      expectedType)
 
     val genFunction = generator.generateFunction(
       ruleDescription,
@@ -172,50 +116,8 @@ class DataSetCalc(
       body,
       returnType)
 
-    val mapFunc = new FlatMapRunner[Any, Any](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-    val calcOpName =
-      s"${if (condition != null) {
-          s"where: ($conditionToString), "
-      } else {
-        ""
-      }}select: ($selectionToString)"
-
-    inputDS.flatMap(mapFunc).name(calcOpName)
-  }
-
-  private def selectionToString: String = {
-    val proj = calcProgram.getProjectList.asScala.toList
-    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
-    val localExprs = calcProgram.getExprList.asScala.toList
-    val outFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
-
-    proj
-      .map(getExpressionString(_, inFields, Some(localExprs)))
-      .zip(outFields).map { case (e, o) => {
-          if (e != o) {
-            e + " AS " + o
-          } else {
-            e
-          }
-        }
-      }.mkString(", ")
-  }
-
-  private def conditionToString: String = {
-
-    val cond = calcProgram.getCondition
-    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
-    val localExprs = calcProgram.getExprList.asScala.toList
-
-    if (cond != null) {
-      getExpressionString(cond, inFields, Some(localExprs))
-    } else {
-      ""
-    }
+    val mapFunc = calcMapFunction(genFunction)
+    inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _)))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index 502e362..eaf6b26 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -20,15 +20,15 @@ package org.apache.flink.api.table.plan.nodes.dataset
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.plan.nodes.FlinkRel
 
 import scala.collection.JavaConversions._
 
-trait DataSetRel extends RelNode {
+trait DataSetRel extends RelNode with FlinkRel {
 
   /**
     * Translates the FlinkRelNode into a Flink operator.
@@ -46,28 +46,6 @@ trait DataSetRel extends RelNode {
       expectedType: Option[TypeInformation[Any]] = None)
     : DataSet[Any]
 
-  private[flink] def getExpressionString(
-    expr: RexNode,
-    inFields: List[String],
-    localExprsTable: Option[List[RexNode]]): String = {
-
-    expr match {
-      case i: RexInputRef => inFields.get(i.getIndex)
-      case l: RexLiteral => l.toString
-      case l: RexLocalRef if localExprsTable.isEmpty =>
-        throw new IllegalArgumentException("Encountered RexLocalRef without local expression table")
-      case l: RexLocalRef =>
-        val lExpr = localExprsTable.get(l.getIndex)
-        getExpressionString(lExpr, inFields, localExprsTable)
-      case c: RexCall => {
-        val op = c.getOperator.toString
-        val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
-        s"$op(${ops.mkString(", ")})"
-      }
-      case _ => throw new IllegalArgumentException("Unknown expression type: " + expr)
-    }
-  }
-
   private[flink] def estimateRowSize(rowType: RelDataType): Double = {
 
     rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
@@ -88,4 +66,3 @@ trait DataSetRel extends RelNode {
   }
 
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
new file mode 100644
index 0000000..fb058f3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.plan.nodes.FlinkCalc
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * Flink RelNode which matches along with FlatMapOperator.
+  *
+  */
+class DataStreamCalc(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    calcProgram: RexProgram,
+    ruleDescription: String)
+  extends SingleRel(cluster, traitSet, input)
+  with FlinkCalc
+  with DataStreamRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamCalc(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      rowType,
+      calcProgram,
+      ruleDescription
+    )
+  }
+
+  override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _))
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("select", selectionToString(calcProgram, getExpressionString(_, _, _)))
+      .itemIf("where",
+        conditionToString(calcProgram, getExpressionString(_, _, _)),
+        calcProgram.getCondition != null)
+  }
+
+  override def translateToPlan(config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+    val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(config)
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val generator = new CodeGenerator(config, inputDataStream.getType)
+
+    val body = functionBody(
+      generator,
+      inputDataStream.getType,
+      getRowType,
+      calcProgram,
+      config,
+      expectedType)
+
+    val genFunction = generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Any, Any]],
+      body,
+      returnType)
+
+    val mapFunc = calcMapFunction(genFunction)
+    inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
new file mode 100644
index 0000000..bcce4c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+
+class DataStreamConvention extends Convention {
+
+  override def toString: String = getName
+
+  def getInterface: Class[_] = classOf[DataStreamRel]
+
+  def getName: String = "DATASTREAM"
+
+  def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
+
+  def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
+
+  def register(planner: RelOptPlanner): Unit = { }
+
+}
+
+object DataStreamConvention {
+
+  val INSTANCE = new DataStreamConvention
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
new file mode 100644
index 0000000..0673a35
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.plan.nodes.FlinkRel
+import org.apache.flink.streaming.api.datastream.DataStream
+
+trait DataStreamRel extends RelNode with FlinkRel {
+
+  /**
+    * Translates the FlinkRelNode into a Flink operator.
+    *
+    * @param config runtime configuration
+    * @param expectedType specifies the type the Flink operator should return. The type must
+    *                     have the same arity as the result. For instance, if the
+    *                     expected type is a RowTypeInfo this method will return a DataSet of
+    *                     type Row. If the expected type is Tuple2, the operator will return
+    *                     a Tuple2 if possible. Row otherwise.
+    * @return DataStream of type expectedType or RowTypeInfo
+    */
+  def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]] = None)
+    : DataStream[Any]
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
new file mode 100644
index 0000000..314759c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamSource.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.plan.schema.DataStreamTable
+import org.apache.flink.api.table.runtime.MapRunner
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * Flink RelNode which matches along with DataStreamSource.
+  * It ensures that types without deterministic field order (e.g. POJOs) are not part of
+  * the plan translation.
+  */
+class DataStreamSource(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable,
+    rowType: RelDataType)
+  extends TableScan(cluster, traitSet, table)
+  with DataStreamRel {
+
+  val dataStreamTable: DataStreamTable[Any] = table.unwrap(classOf[DataStreamTable[Any]])
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamSource(
+      cluster,
+      traitSet,
+      table,
+      rowType
+    )
+  }
+
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]])
+    : DataStream[Any] = {
+
+    val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
+    val inputType = inputDataStream.getType
+
+    expectedType match {
+
+      // special case:
+      // if efficient type usage is enabled and no expected type is set
+      // we can simply forward the DataStream to the next operator.
+      // however, we cannot forward PojoTypes as their fields don't have an order
+      case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
+        inputDataStream
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != inputType) {
+          val generator = new CodeGenerator(
+            config,
+            inputDataStream.getType,
+            dataStreamTable.fieldIndexes)
+
+          val conversion = generator.generateConverterResultExpression(
+            determinedType,
+            getRowType.getFieldNames)
+
+          val body =
+            s"""
+              |${conversion.code}
+              |return ${conversion.resultTerm};
+              |""".stripMargin
+
+          val genFunction = generator.generateFunction(
+            "DataSetSourceConversion",
+            classOf[MapFunction[Any, Any]],
+            body,
+            determinedType)
+
+          val mapFunc = new MapRunner[Any, Any](
+            genFunction.name,
+            genFunction.code,
+            genFunction.returnType)
+
+          inputDataStream.map(mapFunc)
+        }
+        // no conversion necessary, forward
+        else {
+          inputDataStream
+        }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
new file mode 100644
index 0000000..8c9cca0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, BiRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with Union.
+  *
+  */
+class DataStreamUnion(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    rowType: RelDataType)
+  extends BiRel(cluster, traitSet, left, right)
+  with DataStreamRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamUnion(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      rowType
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("union", unionSelectionToString)
+  }
+
+  override def toString = {
+    "Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
+  }
+
+  override def translateToPlan(config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(config)
+    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(config)
+    leftDataSet.union(rightDataSet)
+  }
+
+  private def unionSelectionToString: String = {
+    rowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index b0815ef..0324a0e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -21,11 +21,14 @@ package org.apache.flink.api.table.plan.rules
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSets, RuleSet}
 import org.apache.flink.api.table.plan.rules.dataSet._
+import org.apache.flink.api.table.plan.rules.datastream.DataStreamCalcRule
+import org.apache.flink.api.table.plan.rules.datastream.DataStreamScanRule
+import org.apache.flink.api.table.plan.rules.datastream.DataStreamUnionRule
 
 object FlinkRuleSets {
 
   /**
-    * RuleSet to optimize plans for batch / DataSet exeuction
+    * RuleSet to optimize plans for batch / DataSet execution
     */
   val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
 
@@ -99,4 +102,38 @@ object FlinkRuleSets {
     DataSetUnionRule.INSTANCE
   )
 
+  /**
+  * RuleSet to optimize plans for batch / DataSet execution
+  */
+  val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
+
+    // translate to DataStream nodes
+    DataStreamCalcRule.INSTANCE,
+    DataStreamScanRule.INSTANCE,
+    DataStreamUnionRule.INSTANCE,
+
+    // calc rules
+    FilterToCalcRule.INSTANCE,
+    ProjectToCalcRule.INSTANCE,
+    FilterCalcMergeRule.INSTANCE,
+    ProjectCalcMergeRule.INSTANCE,
+    CalcMergeRule.INSTANCE,
+
+    // prune empty results rules
+    PruneEmptyRules.FILTER_INSTANCE,
+    PruneEmptyRules.PROJECT_INSTANCE,
+    PruneEmptyRules.UNION_INSTANCE,
+
+    // simplify expressions rules
+    ReduceExpressionsRule.CALC_INSTANCE,
+
+    // push and merge projection rules
+    ProjectFilterTransposeRule.INSTANCE,
+    FilterProjectTransposeRule.INSTANCE,
+    ProjectRemoveRule.INSTANCE,
+
+    // merge and push unions rules
+    UnionEliminatorRule.INSTANCE
+  )
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
new file mode 100644
index 0000000..b62967a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCalcRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamCalc
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
+
+class DataStreamCalcRule
+  extends ConverterRule(
+    classOf[LogicalCalc],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamCalcRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE)
+
+    new DataStreamCalc(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      rel.getRowType,
+      calc.getProgram,
+      description)
+  }
+}
+
+object DataStreamCalcRule {
+  val INSTANCE: RelOptRule = new DataStreamCalcRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
new file mode 100644
index 0000000..73d5868
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamScanRule.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamSource
+
+class DataStreamScanRule
+  extends ConverterRule(
+    classOf[LogicalTableScan],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamScanRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+
+    new DataStreamSource(
+      rel.getCluster,
+      traitSet,
+      scan.getTable,
+      rel.getRowType
+    )
+  }
+}
+
+object DataStreamScanRule {
+  val INSTANCE: RelOptRule = new DataStreamScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
new file mode 100644
index 0000000..78a5486
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamUnionRule.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamUnion
+
+class DataStreamUnionRule
+  extends ConverterRule(
+    classOf[LogicalUnion],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamUnionRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+    val union: LogicalUnion = rel.asInstanceOf[LogicalUnion]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataStreamConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataStreamConvention.INSTANCE)
+
+    new DataStreamUnion(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType)
+  }
+}
+
+object DataStreamUnionRule {
+  val INSTANCE: RelOptRule = new DataStreamUnionRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
index ff371f7..bbcba13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
@@ -23,58 +23,15 @@ import java.util
 import java.util.Collections
 
 import org.apache.calcite.rel.{RelCollation, RelDistribution}
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.schema.Statistic
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.api.common.typeinfo.AtomicType
-import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.typeutils.TypeConverter
 
 class DataSetTable[T](
     val dataSet: DataSet[T],
-    val fieldIndexes: Array[Int],
-    val fieldNames: Array[String])
-  extends AbstractTable {
-
-  if (fieldIndexes.length != fieldNames.length) {
-    throw new IllegalArgumentException(
-      "Number of field indexes and field names must be equal.")
-  }
-
-  // check uniquenss of field names
-  if (fieldNames.length != fieldNames.toSet.size) {
-    throw new IllegalArgumentException(
-      "Table field names must be unique.")
-  }
-
-  val fieldTypes: Array[SqlTypeName] =
-    dataSet.getType match {
-      case cType: CompositeType[T] =>
-        if (fieldNames.length != cType.getArity) {
-          throw new IllegalArgumentException(
-          s"Arity of DataSet type (" + cType.getFieldNames.deep + ") " +
-            "not equal to number of field names " + fieldNames.deep + ".")
-        }
-        fieldIndexes
-          .map(cType.getTypeAt(_))
-          .map(TypeConverter.typeInfoToSqlType(_))
-      case aType: AtomicType[T] =>
-        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
-          throw new IllegalArgumentException(
-            "Non-composite input type may have only a single field and its index must be 0.")
-        }
-        Array(TypeConverter.typeInfoToSqlType(aType))
-    }
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    val builder = typeFactory.builder
-    fieldNames.zip(fieldTypes)
-      .foreach( f => builder.add(f._1, f._2).nullable(true) )
-    builder.build
-  }
+    override val fieldIndexes: Array[Int],
+    override val fieldNames: Array[String])
+  extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames) {
 
   override def getStatistic: Statistic = {
     new DefaultDataSetStatistic

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
new file mode 100644
index 0000000..1523f93
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+class DataStreamTable[T](
+    val dataStream: DataStream[T],
+    override val fieldIndexes: Array[Int],
+    override val fieldNames: Array[String])
+  extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val builder = typeFactory.builder
+    fieldNames.zip(fieldTypes)
+      .foreach( f => builder.add(f._1, f._2).nullable(true) )
+    builder.build
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
new file mode 100644
index 0000000..9414fae
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.streaming.api.datastream.DataStream
+
+abstract class FlinkTable[T](
+    val typeInfo: TypeInformation[T],
+    val fieldIndexes: Array[Int],
+    val fieldNames: Array[String])
+  extends AbstractTable {
+
+  if (fieldIndexes.length != fieldNames.length) {
+    throw new IllegalArgumentException(
+      "Number of field indexes and field names must be equal.")
+  }
+
+  // check uniqueness of field names
+  if (fieldNames.length != fieldNames.toSet.size) {
+    throw new IllegalArgumentException(
+      "Table field names must be unique.")
+  }
+
+  val fieldTypes: Array[SqlTypeName] =
+    typeInfo match {
+      case cType: CompositeType[T] =>
+        if (fieldNames.length != cType.getArity) {
+          throw new IllegalArgumentException(
+          s"Arity of DataStream type (" + cType.getFieldNames.deep + ") " +
+            "not equal to number of field names " + fieldNames.deep + ".")
+        }
+        fieldIndexes
+          .map(cType.getTypeAt(_))
+          .map(TypeConverter.typeInfoToSqlType(_))
+      case aType: AtomicType[T] =>
+        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
+          throw new IllegalArgumentException(
+            "Non-composite input type may have only a single field and its index must be 0.")
+        }
+        Array(TypeConverter.typeInfoToSqlType(aType))
+    }
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val builder = typeFactory.builder
+    fieldNames.zip(fieldTypes)
+      .foreach( f => builder.add(f._1, f._2).nullable(true) )
+    builder.build
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f773fe05/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
new file mode 100644
index 0000000..fbfb39e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/FilterITCase.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.streaming.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.JavaConversions._
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.flink.api.scala.table.streaming.test.utils.StreamITCase
+import org.apache.flink.api.scala.table.streaming.test.utils.StreamTestData
+
+class FilterITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testSimpleFilter(): Unit = {
+    /*
+     * Test simple filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter('a === 3)
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    /*
+     * Test all-rejecting filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    /*
+     * Test all-passing filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello", "4,3,Hello world, how are you?",
+      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
+      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
+      "18,6,Comment#12", "20,6,Comment#14")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 !== 0)
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+    val expected = mutable.MutableList(
+      "1,1,Hi", "3,2,Hello world",
+      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
+      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
+      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}


Mime
View raw message