flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [04/50] [abbrv] flink git commit: [FLINK-3223] Translate Table API calls to Calcite RelNodes.
Date Fri, 18 Mar 2016 13:47:58 GMT
[FLINK-3223] Translate Table API calls to Calcite RelNodes.

This is an intermediate step to port the Table API on top of Calcite (FLINK-3221).
This commit:
- Adds Calcite as dependency to flink-table.
- Translates Table API calls directly into Calcite RelNodes.
- Modifies tests to check only the translation into logical plans but not the execution of Table API queries.
- Deactivates a few tests that are not supported yet.
- Removes a lot of the former Table API translation code.
- Removes bitwise operators from the Table API.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9381c430
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9381c430
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9381c430

Branch: refs/heads/master
Commit: 9381c430faf0e197bb2c96c7e418d33725e487f6
Parents: 6968d07
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Jan 15 17:46:39 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Fri Mar 18 14:44:48 2016 +0100

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |   6 +
 .../api/java/table/JavaBatchTranslator.scala    | 340 ++-----------------
 .../java/table/JavaStreamingTranslator.scala    | 241 -------------
 .../flink/api/java/table/TableEnvironment.scala |  43 +--
 .../api/scala/table/DataStreamConversions.scala |  68 ----
 .../api/scala/table/ScalaBatchTranslator.scala  |  26 +-
 .../scala/table/ScalaStreamingTranslator.scala  |  58 ----
 .../api/scala/table/TableConversions.scala      |  12 +-
 .../flink/api/scala/table/expressionDsl.scala   |   5 -
 .../apache/flink/api/scala/table/package.scala  |  21 +-
 .../org/apache/flink/api/table/Table.scala      | 236 +++++++++----
 .../table/codegen/ExpressionCodeGenerator.scala |  35 --
 .../api/table/expressions/aggregations.scala    |  40 +--
 .../analysis/ExtractEquiJoinFields.scala        |  70 ----
 .../expressions/analysis/GroupByAnalyzer.scala  |  51 ---
 .../expressions/analysis/InsertAutoCasts.scala  |  92 -----
 .../analysis/PredicateAnalyzer.scala            |  35 --
 .../analysis/ResolveFieldReferences.scala       |  60 ----
 .../analysis/SelectionAnalyzer.scala            |  36 --
 .../table/expressions/analysis/TypeCheck.scala  |  57 ----
 .../expressions/analysis/VerifyBoolean.scala    |  41 ---
 .../analysis/VerifyNoAggregates.scala           |  53 ---
 .../analysis/VerifyNoNestedAggregates.scala     |  54 ---
 .../api/table/expressions/arithmetic.scala      |  56 +--
 .../flink/api/table/expressions/cast.scala      |   5 +
 .../api/table/expressions/fieldExpression.scala |   5 +
 .../api/table/parser/ExpressionParser.scala     |  16 +-
 .../api/table/plan/ExpandAggregations.scala     | 147 --------
 .../flink/api/table/plan/PlanTranslator.scala   | 133 ++------
 .../api/table/plan/RexNodeTranslator.scala      | 184 ++++++++++
 .../flink/api/table/plan/TypeConverter.scala    |  54 +++
 .../flink/api/table/plan/operations.scala       | 134 --------
 .../api/table/plan/operators/DataSetTable.scala |  66 ++++
 .../apache/flink/api/table/plan/package.scala   |  24 --
 .../apache/flink/api/table/trees/Analyzer.scala |  43 ---
 .../org/apache/flink/api/table/trees/Rule.scala |  30 --
 .../examples/scala/StreamingTableFilter.scala   |  92 -----
 .../api/java/table/test/AggregationsITCase.java |  67 ++--
 .../flink/api/java/table/test/AsITCase.java     |  68 ++--
 .../api/java/table/test/CastingITCase.java      |  62 ++--
 .../api/java/table/test/ExpressionsITCase.java  |  83 +----
 .../flink/api/java/table/test/FilterITCase.java |  58 ++--
 .../table/test/GroupedAggregationsITCase.java   |  35 +-
 .../flink/api/java/table/test/JoinITCase.java   |  70 ++--
 .../api/java/table/test/PojoGroupingITCase.java |  18 +-
 .../flink/api/java/table/test/SelectITCase.java |  70 ++--
 .../api/java/table/test/SqlExplainITCase.java   |   7 +
 .../table/test/StringExpressionsITCase.java     |  45 +--
 .../flink/api/java/table/test/UnionITCase.java  |  44 +--
 .../scala/table/test/PageRankTableITCase.java   |   7 +-
 .../scala/table/test/TypeExceptionTest.scala    |  42 ---
 .../scala/table/test/AggregationsITCase.scala   | 104 +++---
 .../flink/api/scala/table/test/AsITCase.scala   |  74 ++--
 .../api/scala/table/test/CastingITCase.scala    |  57 ++--
 .../scala/table/test/ExpressionsITCase.scala    |  90 ++---
 .../api/scala/table/test/FilterITCase.scala     |  59 ++--
 .../table/test/GroupedAggregationsITCase.scala  |  63 ++--
 .../flink/api/scala/table/test/JoinITCase.scala |  73 ++--
 .../api/scala/table/test/SelectITCase.scala     |  99 +++---
 .../api/scala/table/test/SqlExplainITCase.scala | 198 +++++------
 .../table/test/StringExpressionsITCase.scala    |  52 +--
 .../api/scala/table/test/UnionITCase.scala      |  34 +-
 62 files changed, 1314 insertions(+), 2934 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index 2d56ec9..a9a9238 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -101,6 +101,12 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.calcite</groupId>
+			<artifactId>calcite-core</artifactId>
+			<version>1.5.0</version>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 22be45a..f3f4e9d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -18,23 +18,14 @@
 
 package org.apache.flink.api.java.table
 
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.operators.Keys
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.tools.{RelBuilder, Frameworks}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.aggregation.AggregationFunction
-import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
-import Keys.ExpressionKeys
-import org.apache.flink.api.java.operators.{GroupReduceOperator, MapOperator, UnsortedGrouping}
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields
 import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.runtime._
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo}
-import org.apache.flink.api.table.{ExpressionException, Row, Table}
+import org.apache.flink.api.table.plan.operators.DataSetTable
+import org.apache.flink.api.table.Table
 
 /**
  * [[PlanTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and
@@ -46,302 +37,41 @@ class JavaBatchTranslator extends PlanTranslator {
 
   override def createTable[A](
       repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table = {
-
-    val rowDataSet = createSelect(expressions, repr, inputType)
-
-    Table(Root(rowDataSet, resultFields))
+      fieldNames: Array[String]): Table = {
+
+    // create table representation from DataSet
+    val dataSetTable = new DataSetTable[A](
+    repr.asInstanceOf[JavaDataSet[A]],
+    fieldNames
+    )
+
+    // register table in Cascading schema
+    val schema = Frameworks.createRootSchema(true)
+    val tableName = repr.hashCode().toString
+    schema.add(tableName, dataSetTable)
+
+    // initialize RelBuilder
+    val frameworkConfig = Frameworks
+      .newConfigBuilder
+      .defaultSchema(schema)
+      .build
+    val relBuilder = RelBuilder.create(frameworkConfig)
+
+    // create table scan operator
+    relBuilder.scan(tableName)
+    new Table(relBuilder.build(), relBuilder)
   }
 
-  override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {
-
-    if (tpe.getTypeClass == classOf[Row]) {
-      // shortcut for DataSet[Row]
-      return translateInternal(op).asInstanceOf[JavaDataSet[A]]
-    }
-
-    val clazz = tpe.getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create DataSet of type " +
-        clazz.getName + ". Only top-level classes or static member classes are supported.")
-    }
-
-
-    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
-      throw new ExpressionException(
-        "A Table can only be converted to composite types, type is: " +
-          implicitly[TypeInformation[A]] +
-          ". Composite types would be tuples, case classes and POJOs.")
-    }
-
-    val resultSet = translateInternal(op)
-
-    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
+  override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {
 
-    val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
+    println(RelOptUtil.toString(lPlan))
 
-    val resultNames = resultType.getFieldNames
-    val outputNames = outputType.getFieldNames.toSeq
+    // TODO: optimize & translate:
+    // - optimize RelNode plan
+    // - translate to Flink RelNode plan
+    // - generate DataSet program
 
-    if (resultNames.toSet != outputNames.toSet) {
-      throw new ExpressionException(s"Expression result type $resultType does not have the same " +
-        s"fields as output type $outputType")
-    }
-
-    for (f <- outputNames) {
-      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
-      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
-      if (!in.equals(out)) {
-        throw new ExpressionException(s"Types for field $f differ on input $resultType and " +
-          s"output $outputType.")
-      }
-    }
-
-    val outputFields = outputNames map {
-      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
-    }
-
-    val function = new ExpressionSelectFunction(
-      resultSet.getType.asInstanceOf[RowTypeInfo],
-      outputType,
-      outputFields)
-
-    val opName = s"select(${outputFields.mkString(",")})"
-    val operator = new MapOperator(resultSet, outputType, function, opName)
-
-    operator
+    null
   }
 
-  private def translateInternal(op: PlanNode): JavaDataSet[Row] = {
-    op match {
-      case Root(dataSet: JavaDataSet[Row], resultFields) =>
-        dataSet
-
-      case Root(_, _) =>
-        throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op + ". " +
-          "Did you try converting a Table based on a DataSet to a DataStream or vice-versa?")
-
-      case GroupBy(_, fields) =>
-        throw new ExpressionException("Dangling GroupBy operation. Did you forget a " +
-          "SELECT statement?")
-
-      case As(input, newNames) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray)
-        new RenameOperator(translatedInput, proxyType)
-
-      case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          val translatedLeftInput = translateInternal(leftInput)
-          val translatedRightInput = translateInternal(rightInput)
-          val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-          val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-          createJoin(
-            predicate,
-            selection,
-            translatedLeftInput,
-            translatedRightInput,
-            leftInType,
-            rightInType,
-            JoinHint.OPTIMIZER_CHOOSES)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case Filter(Join(leftInput, rightInput), predicate) =>
-        val translatedLeftInput = translateInternal(leftInput)
-        val translatedRightInput = translateInternal(rightInput)
-        val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-        val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-        createJoin(
-          predicate,
-          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++
-            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)),
-          translatedLeftInput,
-          translatedRightInput,
-          leftInType,
-          rightInType,
-          JoinHint.OPTIMIZER_CHOOSES)
-
-      case Join(leftInput, rightInput) =>
-        throw new ExpressionException("Join without filter condition encountered. " +
-          "Did you forget to add .where(...) ?")
-
-      case sel@Select(input, selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          val translatedInput = input match {
-            case GroupBy(groupByInput, groupExpressions) =>
-              val translatedGroupByInput = translateInternal(groupByInput)
-              val inType = translatedGroupByInput.getType.asInstanceOf[CompositeType[Row]]
-
-              val keyIndices = groupExpressions map {
-                case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name)
-                case e =>
-                  throw new ExpressionException(s"Expression $e is not a valid key expression.")
-              }
-
-              val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType)
-              val grouping = new UnsortedGrouping(translatedGroupByInput, keys)
-
-              new GroupReduceOperator(
-                grouping,
-                inType,
-                new NoExpressionAggregateFunction(),
-                "Nop Expression Aggregation")
-
-            case _ => translateInternal(input)
-          }
-
-          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-          val inputFields = inType.getFieldNames
-          createSelect(
-            selection,
-            translatedInput,
-            inType)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-
-        val keyIndices = groupExpressions map {
-          case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name)
-          case e => throw new ExpressionException(s"Expression $e is not a valid key expression.")
-        }
-
-        val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType)
-
-        val grouping = new UnsortedGrouping(translatedInput, keys)
-
-        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
-          case (fieldName, fun) =>
-            fun.getFactory.createAggregationFunction[Any](
-              inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
-        }
-
-        val aggIndices = aggregations map {
-          case (fieldName, _) =>
-            inType.getFieldIndex(fieldName)
-        }
-
-        val result = new GroupReduceOperator(
-          grouping,
-          inType,
-          new ExpressionAggregateFunction(aggIndices, aggFunctions),
-          "Expression Aggregation: " + agg)
-
-        result
-
-      case agg@Aggregate(input, aggregations) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-
-        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
-          case (fieldName, fun) =>
-            fun.getFactory.createAggregationFunction[Any](
-              inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
-        }
-
-        val aggIndices = aggregations map {
-          case (fieldName, _) =>
-            inType.getFieldIndex(fieldName)
-        }
-
-        val result = new GroupReduceOperator(
-          translatedInput,
-          inType,
-          new ExpressionAggregateFunction(aggIndices, aggFunctions),
-          "Expression Aggregation: " + agg)
-
-        result
-
-
-      case Filter(input, predicate) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val filter = new ExpressionFilterFunction[Row](predicate, inType)
-        translatedInput.filter(filter).name(predicate.toString)
-
-      case uni@UnionAll(left, right) =>
-        val translatedLeft = translateInternal(left)
-        val translatedRight = translateInternal(right)
-        translatedLeft.union(translatedRight).name("Union: " + uni)
-    }
-  }
-
-  private def createSelect[I](
-      fields: Seq[Expression],
-      input: JavaDataSet[I],
-      inputType: CompositeType[I]): JavaDataSet[Row] = {
-
-    fields foreach {
-      f =>
-        if (f.exists(_.isInstanceOf[Aggregation])) {
-          throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".")
-        }
-
-    }
-
-    val resultType = new RowTypeInfo(fields)
-
-    val function = new ExpressionSelectFunction(inputType, resultType, fields)
-
-    val opName = s"select(${fields.mkString(",")})"
-    val operator = new MapOperator(input, resultType, function, opName)
-
-    operator
-  }
-
-  private def createJoin[L, R](
-      predicate: Expression,
-      fields: Seq[Expression],
-      leftInput: JavaDataSet[L],
-      rightInput: JavaDataSet[R],
-      leftType: CompositeType[L],
-      rightType: CompositeType[R],
-      joinHint: JoinHint): JavaDataSet[Row] = {
-
-    val resultType = new RowTypeInfo(fields)
-
-    val (reducedPredicate, leftFields, rightFields) =
-      ExtractEquiJoinFields(leftType, rightType, predicate)
-
-    if (leftFields.isEmpty || rightFields.isEmpty) {
-      throw new ExpressionException("Could not derive equi-join predicates " +
-        "for predicate " + predicate + ".")
-    }
-
-    val leftKey = new ExpressionKeys[L](leftFields, leftType)
-    val rightKey = new ExpressionKeys[R](rightFields, rightType)
-
-    val joiner = new ExpressionJoinFunction[L, R, Row](
-      reducedPredicate,
-      leftType,
-      rightType,
-      resultType,
-      fields)
-
-    new EquiJoin[L, R, Row](
-      leftInput,
-      rightInput,
-      leftKey,
-      rightKey,
-      joiner,
-      resultType,
-      joinHint,
-      predicate.toString)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
deleted file mode 100644
index a37c892..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table
-
-import java.lang.reflect.Modifier
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, ExpressionSelectFunction}
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{ExpressionException, Row, Table}
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.operators.StreamMap
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Java [[DataStream]]s and
- * translating them back to Java [[DataStream]]s.
- *
- * This is very limited right now. Only select and filter are implemented. Also, the expression
- * operations must be extended to allow windowing operations.
- */
-
-class JavaStreamingTranslator extends PlanTranslator {
-
-  type Representation[A] = DataStream[A]
-
-  override def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table = {
-
-    val rowDataStream = createSelect(expressions, repr, inputType)
-
-    new Table(Root(rowDataStream, resultFields))
-  }
-
-  override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-
-    if (tpe.getTypeClass == classOf[Row]) {
-      // shortcut for DataSet[Row]
-      return translateInternal(op).asInstanceOf[DataStream[A]]
-    }
-
-    val clazz = tpe.getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create DataStream of type " +
-        clazz.getName + ". Only top-level classes or static member classes are supported.")
-    }
-
-    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
-      throw new ExpressionException(
-        "A Table can only be converted to composite types, type is: " +
-          implicitly[TypeInformation[A]] +
-          ". Composite types would be tuples, case classes and POJOs.")
-
-    }
-
-    val resultSet = translateInternal(op)
-
-    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
-
-    val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
-
-    val resultNames = resultType.getFieldNames
-    val outputNames = outputType.getFieldNames.toSeq
-
-    if (resultNames.toSet != outputNames.toSet) {
-      throw new ExpressionException(s"Expression result type $resultType does not have the same" +
-        s"fields as output type $outputType")
-    }
-
-    for (f <- outputNames) {
-      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
-      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
-      if (!in.equals(out)) {
-        throw new ExpressionException(s"Types for field $f differ on input $resultType and " +
-          s"output $outputType.")
-      }
-    }
-
-    val outputFields = outputNames map {
-      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
-    }
-
-    val function = new ExpressionSelectFunction(
-      resultSet.getType.asInstanceOf[RowTypeInfo],
-      outputType,
-      outputFields)
-
-    val opName = s"select(${outputFields.mkString(",")})"
-
-    resultSet.transform(opName, outputType, new StreamMap[Row, A](function))
-  }
-
-  private def translateInternal(op: PlanNode): DataStream[Row] = {
-    op match {
-      case Root(dataSet: DataStream[Row], resultFields) =>
-        dataSet
-
-      case Root(_, _) =>
-        throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op + ". " +
-          "Did you try converting a Table based on a DataSet to a DataStream or vice-versa?")
-
-      case GroupBy(_, fields) =>
-        throw new ExpressionException("Dangling GroupBy operation. Did you forget a " +
-          "SELECT statement?")
-
-      case As(input, newNames) =>
-        throw new ExpressionException("As operation for Streams not yet implemented.")
-
-      case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          val translatedLeftInput = translateInternal(leftInput)
-          val translatedRightInput = translateInternal(rightInput)
-          val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-          val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-          createJoin(
-            predicate,
-            selection,
-            translatedLeftInput,
-            translatedRightInput,
-            leftInType,
-            rightInType,
-            JoinHint.OPTIMIZER_CHOOSES)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case Filter(Join(leftInput, rightInput), predicate) =>
-        val translatedLeftInput = translateInternal(leftInput)
-        val translatedRightInput = translateInternal(rightInput)
-        val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-        val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-        createJoin(
-          predicate,
-          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++
-            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)),
-          translatedLeftInput,
-          translatedRightInput,
-          leftInType,
-          rightInType,
-          JoinHint.OPTIMIZER_CHOOSES)
-
-      case Join(leftInput, rightInput) =>
-        throw new ExpressionException("Join without filter condition encountered. " +
-          "Did you forget to add .where(...) ?")
-
-      case sel@Select(input, selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          // no expansions took place
-          val translatedInput = translateInternal(input)
-          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-          val inputFields = inType.getFieldNames
-          createSelect(
-            selection,
-            translatedInput,
-            inType)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
-        throw new ExpressionException("Aggregate operation for Streams not yet implemented.")
-
-      case agg@Aggregate(input, aggregations) =>
-        throw new ExpressionException("Aggregate operation for Streams not yet implemented.")
-
-      case Filter(input, predicate) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val filter = new ExpressionFilterFunction[Row](predicate, inType)
-        translatedInput.filter(filter)
-
-      case UnionAll(left, right) =>
-        val translatedLeft = translateInternal(left)
-        val translatedRight = translateInternal(right)
-        translatedLeft.union(translatedRight)
-    }
-  }
-
-  private def createSelect[I](
-      fields: Seq[Expression],
-      input: DataStream[I],
-      inputType: CompositeType[I]): DataStream[Row] = {
-
-    fields foreach {
-      f =>
-        if (f.exists(_.isInstanceOf[Aggregation])) {
-          throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".")
-        }
-
-    }
-
-    val resultType = new RowTypeInfo(fields)
-
-    val function = new ExpressionSelectFunction(inputType, resultType, fields)
-
-    val opName = s"select(${fields.mkString(",")})"
-
-    input.transform(opName, resultType, new StreamMap[I, Row](function))
-  }
-
-  private def createJoin[L, R](
-      predicate: Expression,
-      fields: Seq[Expression],
-      leftInput: DataStream[L],
-      rightInput: DataStream[R],
-      leftType: CompositeType[L],
-      rightType: CompositeType[R],
-      joinHint: JoinHint): DataStream[Row] = {
-
-    throw new ExpressionException("Join operation for Streams not yet implemented.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
index 5614031..01e38db 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
@@ -21,12 +21,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.table.Table
-import org.apache.flink.streaming.api.datastream.DataStream
 
 /**
  * Environment for working with the Table API.
  *
- * This can be used to convert [[DataSet]] or [[DataStream]] to a [[Table]] and back again. You
+ * This can be used to convert a [[DataSet]] to a [[Table]] and back again. You
  * can also use the provided methods to create a [[Table]] directly from a data source.
  */
 class TableEnvironment {
@@ -58,32 +57,6 @@ class TableEnvironment {
   }
 
   /**
-   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataStream type are renamed to the given set of fields:
-   *
-   * Example:
-   *
-   * {{{
-   *   tableEnv.fromDataStream(set, "a, b")
-   * }}}
-   *
-   * This will transform the set containing elements of two fields to a table where the fields
-   * are named a and b.
-   */
-  def fromDataStream[T](set: DataStream[T], fields: String): Table = {
-    new JavaStreamingTranslator().createTable(set, fields)
-  }
-
-  /**
-   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataStream type are used to name the
-   * [[org.apache.flink.api.table.Table]] fields.
-   */
-  def fromDataStream[T](set: DataStream[T]): Table = {
-    new JavaStreamingTranslator().createTable(set)
-  }
-
-  /**
    * Converts the given [[org.apache.flink.api.table.Table]] to
    * a DataSet. The given type must have exactly the same fields as the
    * [[org.apache.flink.api.table.Table]]. That is, the names of the
@@ -91,21 +64,9 @@ class TableEnvironment {
    */
   @SuppressWarnings(Array("unchecked"))
   def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
-    new JavaBatchTranslator().translate[T](table.operation)(
+    new JavaBatchTranslator().translate[T](table.relNode)(
       TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
   }
 
-  /**
-   * Converts the given [[org.apache.flink.api.table.Table]] to
-   * a DataStream. The given type must have exactly the same fields as the
-   * [[org.apache.flink.api.table.Table]]. That is, the names of the
-   * fields and the types must match.
-   */
-  @SuppressWarnings(Array("unchecked"))
-  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
-    new JavaStreamingTranslator().translate[T](table.operation)(
-      TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
-
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
deleted file mode 100644
index 47bd100..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{Expression, UnresolvedFieldReference}
-import org.apache.flink.streaming.api.scala.DataStream
-
-class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T]) {
-
-  /**
-   * Converts the [[DataStream]] to a [[Table]]. The field names can be specified like this:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val table = in.as('a, 'b)
-   * }}}
-   *
-   * This results in a [[Table]] that has field `a` of type `String` and field `b`
-   * of type `Int`.
-   */
-
-  def as(fields: Expression*): Table = {
-     new ScalaStreamingTranslator().createTable(
-       stream,
-       fields.toArray,
-       checkDeterministicFields = true)
-  }
-
-  /**
-   * Converts the [[DataStream]] to a [[Table]]. The field names will be taken from the field
-   * names of the input type.
-   *
-   * Example:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val table = in.toTable
-   * }}}
-   *
-   * This results in a [[Table]] that has field `_1` of type `String` and field `_2`
-   * of type `Int`.
-   */
-
-  def toTable: Table = {
-    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
-    as(resultFields: _*)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
index cdcf53e..1c453fa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
@@ -18,10 +18,8 @@
 
 package org.apache.flink.api.scala.table
 
-
-import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.java.table.JavaBatchTranslator
-import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.scala.wrap
 import org.apache.flink.api.table.plan._
 import org.apache.flink.api.table.Table
@@ -30,7 +28,6 @@ import org.apache.flink.api.scala.DataSet
 
 import scala.reflect.ClassTag
 
-
 /**
  * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and
  * translating them back to Scala [[DataSet]]s.
@@ -41,28 +38,13 @@ class ScalaBatchTranslator extends PlanTranslator {
 
   type Representation[A] = DataSet[A]
 
-  def createTable[A](
-      repr: DataSet[A],
-      fields: Array[Expression]): Table = {
-
-    val result = javaTranslator.createTable(repr.javaSet, fields)
-
-    new Table(result.operation)
+  override def createTable[A](repr: Representation[A], fieldNames: Array[String]): Table = {
+    javaTranslator.createTable(repr.javaSet, fieldNames)
   }
 
-  override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataSet[O] = {
+  override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): DataSet[O] = {
     // fake it till you make it ...
     wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
   }
 
-  override def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table = {
-
-    val result = javaTranslator.createTable(repr.javaSet, inputType, expressions, resultFields)
-
-    Table(result.operation)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
deleted file mode 100644
index 091d893..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.table.JavaStreamingTranslator
-import org.apache.flink.api.table.Table
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.streaming.api.scala.{DataStream, asScalaStream}
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
- * translating them back to Scala [[DataStream]]s.
- *
- * This is very limited right now. Only select and filter are implemented. Also, the expression
- * operations must be extended to allow windowing operations.
- */
-class ScalaStreamingTranslator extends PlanTranslator {
-
-  private val javaTranslator = new JavaStreamingTranslator
-
-  override type Representation[A] = DataStream[A]
-
-  override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataStream[O] = {
-    // fake it till you make it ...
-    asScalaStream(javaTranslator.translate(op))
-  }
-
-  override def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table = {
-
-    val result =
-      javaTranslator.createTable(repr.javaStream, inputType, expressions, resultFields)
-
-    new Table(result.operation)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
index 4f2172e..fdcd804 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
@@ -22,10 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.api.table._
 
-import org.apache.flink.streaming.api.scala.DataStream
-
 /**
- * Methods for converting a [[Table]] to a [[DataSet]] or [[DataStream]]. A [[Table]] is
+ * Methods for converting a [[Table]] to a [[DataSet]]. A [[Table]] is
  * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]].
  */
 class TableConversions(table: Table) {
@@ -34,14 +32,8 @@ class TableConversions(table: Table) {
    * Converts the [[Table]] to a [[DataSet]].
    */
   def toDataSet[T: TypeInformation]: DataSet[T] = {
-     new ScalaBatchTranslator().translate[T](table.operation)
+     new ScalaBatchTranslator().translate[T](table.relNode)
   }
 
-  /**
-   * Converts the [[Table]] to a [[DataStream]].
-   */
-  def toDataStream[T: TypeInformation]: DataStream[T] = {
-    new ScalaStreamingTranslator().translate[T](table.operation)
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 0be6be2..058ff0e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -55,11 +55,6 @@ trait ImplicitExpressionOperations {
   def * (other: Expression) = Mul(expr, other)
   def % (other: Expression) = Mod(expr, other)
 
-  def & (other: Expression) = BitwiseAnd(expr, other)
-  def | (other: Expression) = BitwiseOr(expr, other)
-  def ^ (other: Expression) = BitwiseXor(expr, other)
-  def unary_~ = BitwiseNot(expr)
-
   def abs = Abs(expr)
 
   def sum = Sum(expr)

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index 31373a3..86bb7c0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -19,7 +19,6 @@ package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.table.{Row, Table}
-import org.apache.flink.streaming.api.scala.DataStream
 
 import scala.language.implicitConversions
 
@@ -32,7 +31,7 @@ import scala.language.implicitConversions
  *   import org.apache.flink.api.scala.table._
  * }}}
  *
- * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] to a
+ * imports implicit conversions for converting a [[DataSet]] to a
  * [[Table]]. This can be used to perform SQL-like queries on data. Please have
  * a look at [[Table]] to see which operations are supported and
  * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see how an
@@ -78,7 +77,7 @@ package object table extends ImplicitExpressionConversions {
 
   implicit def table2RowDataSet(
       table: Table): DataSet[Row] = {
-    new ScalaBatchTranslator().translate[Row](table.operation)
+    new ScalaBatchTranslator().translate[Row](table.relNode)
   }
 
   implicit def rowDataSet2Table(
@@ -86,20 +85,4 @@ package object table extends ImplicitExpressionConversions {
     rowDataSet.toTable
   }
 
-  implicit def dataStream2DataSetConversions[T](
-      stream: DataStream[T]): DataStreamConversions[T] = {
-    new DataStreamConversions[T](
-      stream,
-      stream.javaStream.getType.asInstanceOf[CompositeType[T]])
-  }
-
-  implicit def table2RowDataStream(
-      table: Table): DataStream[Row] = {
-    new ScalaStreamingTranslator().translate[Row](table.operation)
-  }
-
-  implicit def rowDataStream2Table(
-      rowDataStream: DataStream[Row]): Table = {
-    rowDataStream.toTable
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
index 641f2fa..271aa99 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -17,14 +17,22 @@
  */
 package org.apache.flink.api.table
 
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.table.explain.PlanJsonParser
-import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer}
-import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeField
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
+import org.apache.flink.api.table.plan.RexNodeTranslator
+import RexNodeTranslator.{toRexNode, extractAggCalls}
+import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.parser.ExpressionParser
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
+
+import scala.collection.JavaConverters._
+
+case class BaseTable(
+    private[flink] val relNode: RelNode,
+    private[flink] val relBuilder: RelBuilder)
 
 /**
  * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs
@@ -50,7 +58,11 @@ import org.apache.flink.api.scala.table._
  * in a Scala DSL or as an expression String. Please refer to the documentation for the expression
  * syntax.
  */
-case class Table(private[flink] val operation: PlanNode) {
+class Table(
+  private[flink] override val relNode: RelNode,
+  private[flink] override val relBuilder: RelBuilder)
+  extends BaseTable(relNode, relBuilder)
+{
 
   /**
    * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
@@ -63,14 +75,30 @@ case class Table(private[flink] val operation: PlanNode) {
    * }}}
    */
   def select(fields: Expression*): Table = {
-    val analyzer = new SelectionAnalyzer(operation.outputFields)
-    val analyzedFields = fields.map(analyzer.analyze)
-    val fieldNames = analyzedFields map(_.name)
-    if (fieldNames.toSet.size != fieldNames.size) {
-      throw new ExpressionException(s"Resulting fields names are not unique in expression" +
-        s""" "${fields.mkString(", ")}".""")
+
+    relBuilder.push(relNode)
+
+    // separate aggregations and selection expressions
+    val extractedAggCalls: List[(Expression, List[AggCall])] = fields
+      .map(extractAggCalls(_, relBuilder)).toList
+
+    // get aggregation calls
+    val aggCalls: List[AggCall] = extractedAggCalls
+      .map(_._2).reduce( (x,y) => x ::: y)
+
+    // apply aggregations
+    if (aggCalls.nonEmpty) {
+      val emptyKey: GroupKey = relBuilder.groupKey()
+      relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
     }
-    this.copy(operation = Select(operation, analyzedFields))
+
+    // get selection expressions
+    val exprs: List[RexNode] = extractedAggCalls
+      .map(_._1)
+      .map(toRexNode(_, relBuilder))
+
+    relBuilder.project(exprs.toIterable.asJava)
+    new Table(relBuilder.build(), relBuilder)
   }
 
   /**
@@ -99,13 +127,12 @@ case class Table(private[flink] val operation: PlanNode) {
    * }}}
    */
   def as(fields: Expression*): Table = {
-    fields forall {
-      f => f.isInstanceOf[UnresolvedFieldReference]
-    } match {
-      case true =>
-      case false => throw new ExpressionException("Only field expression allowed in as().")
-    }
-    this.copy(operation = As(operation, fields.toArray map { _.name }))
+
+    relBuilder.push(relNode)
+    val expressions = fields.map(toRexNode(_, relBuilder)).toIterable.asJava
+    val names = fields.map(_.name).toIterable.asJava
+    relBuilder.project(expressions, names)
+    new Table(relBuilder.build(), relBuilder)
   }
 
   /**
@@ -134,9 +161,11 @@ case class Table(private[flink] val operation: PlanNode) {
    * }}}
    */
   def filter(predicate: Expression): Table = {
-    val analyzer = new PredicateAnalyzer(operation.outputFields)
-    val analyzedPredicate = analyzer.analyze(predicate)
-    this.copy(operation = Filter(operation, analyzedPredicate))
+
+    relBuilder.push(relNode)
+    val pred = toRexNode(predicate, relBuilder)
+    relBuilder.filter(pred)
+    new Table(relBuilder.build(), relBuilder)
   }
 
   /**
@@ -192,20 +221,13 @@ case class Table(private[flink] val operation: PlanNode) {
    *   in.groupBy('key).select('key, 'value.avg)
    * }}}
    */
-  def groupBy(fields: Expression*): Table = {
-    val analyzer = new GroupByAnalyzer(operation.outputFields)
-    val analyzedFields = fields.map(analyzer.analyze)
+  def groupBy(fields: Expression*): GroupedTable = {
 
-    val illegalKeys = analyzedFields filter {
-      case fe: ResolvedFieldReference => false // OK
-      case e => true
-    }
+    relBuilder.push(relNode)
+    val groupExpr = fields.map(toRexNode(_, relBuilder)).toIterable.asJava
+    val groupKey = relBuilder.groupKey(groupExpr)
 
-    if (illegalKeys.nonEmpty) {
-      throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", "))
-    }
-
-    this.copy(operation = GroupBy(operation, analyzedFields))
+    new GroupedTable(relBuilder.build(), relBuilder, groupKey)
   }
 
   /**
@@ -218,7 +240,7 @@ case class Table(private[flink] val operation: PlanNode) {
    *   in.groupBy("key").select("key, value.avg")
    * }}}
    */
-  def groupBy(fields: String): Table = {
+  def groupBy(fields: String): GroupedTable = {
     val fieldsExpr = ExpressionParser.parseExpressionList(fields)
     groupBy(fieldsExpr: _*)
   }
@@ -235,16 +257,21 @@ case class Table(private[flink] val operation: PlanNode) {
    * }}}
    */
   def join(right: Table): Table = {
-    val leftInputNames = operation.outputFields.map(_._1).toSet
-    val rightInputNames = right.operation.outputFields.map(_._1).toSet
-    if (leftInputNames.intersect(rightInputNames).nonEmpty) {
-      throw new ExpressionException(
-        "Overlapping fields names on join input, result would be ambiguous: " +
-          operation.outputFields.mkString(", ") +
-          " and " +
-          right.operation.outputFields.mkString(", ") )
+
+    // check that join inputs do not have overlapping field names
+    val leftFields = relNode.getRowType.getFieldNames.asScala.toSet
+    val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet
+    if (leftFields.intersect(rightFields).nonEmpty) {
+      throw new IllegalArgumentException("Overlapping fields names on join input.")
     }
-    this.copy(operation = Join(operation, right.operation))
+
+    relBuilder.push(relNode)
+    relBuilder.push(right.relNode)
+
+    relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
+    val join = relBuilder.build()
+    val rowT = join.getRowType()
+    new Table(join, relBuilder)
   }
 
   /**
@@ -258,17 +285,27 @@ case class Table(private[flink] val operation: PlanNode) {
    * }}}
    */
   def unionAll(right: Table): Table = {
-    val leftInputFields = operation.outputFields
-    val rightInputFields = right.operation.outputFields
-    if (!leftInputFields.equals(rightInputFields)) {
-      throw new ExpressionException(
-        "The fields names of join inputs should be fully overlapped, left inputs fields:" +
-          operation.outputFields.mkString(", ") +
-          " and right inputs fields" +
-          right.operation.outputFields.mkString(", ")
-      )
+
+    val leftRowType: List[RelDataTypeField] = relNode.getRowType.getFieldList.asScala.toList
+    val rightRowType: List[RelDataTypeField] = right.relNode.getRowType.getFieldList.asScala.toList
+
+    if (leftRowType.length != rightRowType.length) {
+      throw new IllegalArgumentException("Unioned tables have varying row schema.")
+    }
+    else {
+      val zipped: List[(RelDataTypeField, RelDataTypeField)] = leftRowType.zip(rightRowType)
+      zipped.foreach { case (x, y) =>
+        if (!x.getName.equals(y.getName) || x.getType != y.getType) {
+          throw new IllegalArgumentException("Unioned tables have varying row schema.")
+        }
+      }
     }
-    this.copy(operation = UnionAll(operation, right.operation))
+
+    relBuilder.push(relNode)
+    relBuilder.push(right.relNode)
+
+    relBuilder.union(true)
+    new Table(relBuilder.build(), relBuilder)
   }
 
   /**
@@ -277,18 +314,79 @@ case class Table(private[flink] val operation: PlanNode) {
    * referenced by the statement will be scanned.
    */
   def explain(extended: Boolean): String = {
-    val ast = operation
-    val dataSet = this.toDataSet[Row]
-    val env = dataSet.getExecutionEnvironment
-    dataSet.output(new DiscardingOutputFormat[Row])
-    val jasonSqlPlan = env.getExecutionPlan()
-    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
-    val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan ==" +
-      "\n" + sqlPlan
-    return result
+
+    // TODO: enable once toDataSet() is working again
+
+//    val ast = operation
+//    val dataSet = this.toDataSet[Row]
+//    val env = dataSet.getExecutionEnvironment
+//    dataSet.output(new DiscardingOutputFormat[Row])
+//    val jasonSqlPlan = env.getExecutionPlan()
+//    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
+//    val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan ==" +
+//      "\n" + sqlPlan
+//    return result
+
+    ""
   }
-  
+
   def explain(): String = explain(false)
-  
-  override def toString: String = s"Expression($operation)"
+}
+
+class GroupedTable(
+    private[flink] override val relNode: RelNode,
+    private[flink] override val relBuilder: RelBuilder,
+    private[flink] val groupKey: GroupKey) extends BaseTable(relNode, relBuilder) {
+
+  /**
+    * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
+    * can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10))
+    * }}}
+    */
+  def select(fields: Expression*): Table = {
+
+    relBuilder.push(relNode)
+
+    // separate aggregations and selection expressions
+    val extractedAggCalls: List[(Expression, List[AggCall])] = fields
+      .map(extractAggCalls(_, relBuilder)).toList
+
+    // get aggregation calls
+    val aggCalls: List[AggCall] = extractedAggCalls
+      .map(_._2).reduce( (x,y) => x ::: y)
+
+    // apply aggregations
+    if (aggCalls.nonEmpty) {
+      relBuilder.aggregate(groupKey, aggCalls.toIterable.asJava)
+    }
+
+    // get selection expressions
+    val exprs: List[RexNode] = extractedAggCalls
+      .map(_._1)
+      .map(toRexNode(_, relBuilder))
+
+    relBuilder.project(exprs.toIterable.asJava)
+    new Table(relBuilder.build(), relBuilder)
+  }
+
+  /**
+    * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
+    * can contain complex expressions and aggregations.
+    *
+    * Example:
+    *
+    * {{{
+    *   in.select("key, value.avg + " The average" as average, other.substring(0, 10)")
+    * }}}
+    */
+  def select(fields: String): Table = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    select(fieldExprs: _*)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
index a03ba61..9592f2e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
@@ -561,41 +561,6 @@ abstract class ExpressionCodeGenerator[R](
             """.stripMargin
         }
 
-      case BitwiseAnd(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"(int) $leftTerm & (int) $rightTerm"
-        }
-
-      case BitwiseOr(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"(int) $leftTerm | (int) $rightTerm"
-        }
-
-      case BitwiseXor(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"(int) $leftTerm ^ (int) $rightTerm"
-        }
-
-      case BitwiseNot(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code +
-            s"""
-              |boolean $nullTerm = ${childCode.nullTerm};
-              |$resultTpe $resultTerm;
-              |if ($nullTerm) {
-              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
-              |} else {
-              |  $resultTerm = ~((int) ${childCode.resultTerm});
-              |}
-            """.stripMargin
-        } else {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = ~((int) ${childCode.resultTerm});
-            """.stripMargin
-        }
-
       case Not(child) =>
         val childCode = generateExpression(child)
         if (nullCheck) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
index 08e319d..d2fbdff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
@@ -17,9 +17,8 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.table.ExpressionException
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.table.ExpressionException
 
 
 abstract sealed class Aggregation extends UnaryExpression { self: Product =>
@@ -39,61 +38,24 @@ abstract sealed class Aggregation extends UnaryExpression { self: Product =>
   }
 
   override def toString = s"Aggregate($child)"
-
-  def getIntermediateFields: Seq[Expression]
-  def getFinalField(inputs: Seq[Expression]): Expression
-  def getAggregations: Seq[Aggregations]
 }
 
 case class Sum(child: Expression) extends Aggregation {
   override def toString = s"($child).sum"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.SUM)
 }
 
 case class Min(child: Expression) extends Aggregation {
   override def toString = s"($child).min"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.MIN)
-
 }
 
 case class Max(child: Expression) extends Aggregation {
   override def toString = s"($child).max"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.MAX)
 }
 
 case class Count(child: Expression) extends Aggregation {
-  override def typeInfo = {
-    child.typeInfo match {
-      case _ => // we can count anything... :D
-    }
-    BasicTypeInfo.INT_TYPE_INFO
-  }
-
   override def toString = s"($child).count"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1)))
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.SUM)
-
 }
 
 case class Avg(child: Expression) extends Aggregation {
   override def toString = s"($child).avg"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
-  // This is just sweet. Use our own AST representation and let the code generator do
-  // our dirty work.
-  override def getFinalField(inputs: Seq[Expression]): Expression =
-    Div(inputs(0), inputs(1))
-  override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM)
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
deleted file mode 100644
index 797de55..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.common.typeutils.CompositeType
-
-import scala.collection.mutable
-
-/**
- * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified
- * expression without the equi-join predicates together with indices of the join fields
- * from both the left and right input.
- */
-object ExtractEquiJoinFields {
-  def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = {
-
-    val joinFieldsLeft = mutable.MutableList[Int]()
-    val joinFieldsRight = mutable.MutableList[Int]()
-
-    val equiJoinExprs = mutable.MutableList[EqualTo]()
-    // First get all `===` expressions that are not below an `Or`
-    predicate.transformPre {
-      case or@Or(_, _) => NopExpression()
-      case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) =>
-        if (leftType.hasField(le.name) && rightType.hasField(re.name)) {
-          joinFieldsLeft += leftType.getFieldIndex(le.name)
-          joinFieldsRight += rightType.getFieldIndex(re.name)
-        } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) {
-          joinFieldsLeft += leftType.getFieldIndex(re.name)
-          joinFieldsRight += rightType.getFieldIndex(le.name)
-        } else {
-          // not an equi-join predicate
-        }
-        equiJoinExprs += eq
-        eq
-    }
-
-    // then remove the equi join expressions from the predicate
-    val resultExpr = predicate.transformPost {
-      // For OR, we can eliminate the OR since the equi join
-      // predicate is evaluated before the expression is evaluated
-      case or@Or(NopExpression(), _) => NopExpression()
-      case or@Or(_, NopExpression()) => NopExpression()
-      // For AND we replace it with the other expression, since the
-      // equi join predicate will always be true
-      case and@And(NopExpression(), other) => other
-      case and@And(other, NopExpression()) => other
-      case eq : EqualTo if equiJoinExprs.contains(eq) =>
-        NopExpression()
-    }
-
-    (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
deleted file mode 100644
index 6c7ecb2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{ResolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.collection.mutable
-
-import org.apache.flink.api.table.trees.{Rule, Analyzer}
-
-
-/**
- * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions.
- */
-class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
-  extends Analyzer[Expression] {
-
-  def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression)
-
-  object CheckGroupExpression extends Rule[Expression] {
-
-    def apply(expr: Expression) = {
-      val errors = mutable.MutableList[String]()
-
-      expr match {
-        case f: ResolvedFieldReference => // this is OK
-        case other =>
-          throw new ExpressionException(
-            s"""Invalid grouping expression "$expr". Only field references are allowed.""")
-      }
-      expr
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
deleted file mode 100644
index 0fdcab6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo}
-import org.apache.flink.api.table.trees.Rule
-
-/**
- * [[Rule]] that adds casts in arithmetic operations.
- */
-class InsertAutoCasts extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val result = expr.transformPost {
-
-      case plus@Plus(o1, o2) =>
-        // Plus is special case since we can cast anything to String for String concat
-        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            Plus(Cast(o1, o2.typeInfo), o2)
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            Plus(o1, Cast(o2, o1.typeInfo))
-          } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
-            Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO))
-          } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
-            Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2)
-          } else {
-            plus
-          }
-        } else {
-          plus
-        }
-
-      case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] ||
-        ba.isInstanceOf[BinaryComparison] =>
-        val o1 = ba.left
-        val o2 = ba.right
-        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
-          } else {
-            ba
-          }
-        } else {
-          ba
-        }
-
-      case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] =>
-        val o1 = ba.left
-        val o2 = ba.right
-        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
-          o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
-          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
-          } else {
-            ba
-          }
-        } else {
-          ba
-        }
-    }
-
-    result
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
deleted file mode 100644
index e9236f7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.trees.Analyzer
-
-/**
- * Analyzer for predicates, i.e. filter operations and where clauses of joins.
- */
-class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
-  extends Analyzer[Expression] {
-  def rules = Seq(
-    new ResolveFieldReferences(inputFields),
-    new InsertAutoCasts,
-    new TypeCheck,
-    new VerifyNoAggregates,
-    new VerifyBoolean)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
deleted file mode 100644
index db7ea6c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions.{ResolvedFieldReference,
-UnresolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table._
-
-import scala.collection.mutable
-
-import org.apache.flink.api.table.trees.Rule
-
-/**
- * Rule that resolved field references. This rule verifies that field references point to existing
- * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field
- * [[TypeInformation]] in addition to the field name.
- */
-class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])])
-  extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPost {
-      case fe@UnresolvedFieldReference(fieldName) =>
-        inputFields.find { _._1 == fieldName } match {
-          case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe)
-
-          case None =>
-            errors +=
-              s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}"
-            fe
-        }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
deleted file mode 100644
index 625fdbf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.trees.Analyzer
-
-/**
- * This analyzes selection expressions.
- */
-class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
-  extends Analyzer[Expression] {
-
-  def rules = Seq(
-    new ResolveFieldReferences(inputFields),
-    new VerifyNoNestedAggregates,
-    new InsertAutoCasts,
-    new TypeCheck)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
deleted file mode 100644
index b724561..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.trees.Rule
-import org.apache.flink.api.table.{_}
-
-import scala.collection.mutable
-
-/**
- * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once.
- * Expressions are expected to perform type verification in this method.
- */
-class TypeCheck extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPre {
-      case expr: Expression=> {
-        // simply get the typeInfo from the expression. this will perform type analysis
-        try {
-          expr.typeInfo
-        } catch {
-          case e: ExpressionException =>
-            errors += e.getMessage
-        }
-        expr
-      }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
deleted file mode 100644
index e75dd20..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions.{NopExpression, Expression}
-import org.apache.flink.api.table.trees.Rule
-import org.apache.flink.api.table.{_}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-import scala.collection.mutable
-
-/**
- * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required
- * for filter/join predicates.
- */
-class VerifyBoolean extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.")
-    }
-
-    expr
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9381c430/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
deleted file mode 100644
index 09dbf88..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.table.expressions.{Aggregation, Expression}
-
-import scala.collection.mutable
-
-import org.apache.flink.api.table.trees.Rule
-
-/**
- * Rule that verifies that an expression does not contain aggregate operations. Right now, join
- * predicates and filter predicates cannot contain aggregates.
- */
-class VerifyNoAggregates extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPre {
-      case agg: Aggregation=> {
-        errors +=
-          s"""Aggregations are not allowed in join/filter predicates."""
-        agg
-      }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}


Mime
View raw message