flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [40/50] [abbrv] flink git commit: [FLINK-3489] TableAPI refactoring and cleanup
Date Fri, 18 Mar 2016 13:48:34 GMT
[FLINK-3489] TableAPI refactoring and cleanup

This closes #1789


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63c6dad4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63c6dad4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63c6dad4

Branch: refs/heads/master
Commit: 63c6dad4d8e78c50d9e170f605908d44e9918df6
Parents: 316932c
Author: vasia <vasia@apache.org>
Authored: Mon Mar 14 14:16:29 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Fri Mar 18 14:44:51 2016 +0100

----------------------------------------------------------------------
 .../api/java/table/JavaBatchTranslator.scala    |  11 -
 .../flink/api/table/ExpressionException.scala   |  23 -
 .../api/table/ExpressionParserException.scala   |  23 +
 .../flink/api/table/codegen/CodeGenUtils.scala  |   2 +-
 .../flink/api/table/codegen/CodeGenerator.scala |   4 +-
 .../api/table/expressions/Expression.scala      |   7 -
 .../table/expressions/ExpressionParser.scala    | 282 +++++++++++++
 .../flink/api/table/expressions/TreeNode.scala  | 120 ++++++
 .../api/table/expressions/aggregations.scala    |  19 -
 .../api/table/expressions/arithmetic.scala      |  45 +-
 .../flink/api/table/expressions/call.scala      |   2 -
 .../flink/api/table/expressions/cast.scala      |  11 +-
 .../api/table/expressions/comparison.scala      |  43 +-
 .../api/table/expressions/fieldExpression.scala |  11 +-
 .../flink/api/table/expressions/logic.scala     |  20 +-
 .../api/table/parser/ExpressionParser.scala     | 281 -------------
 .../flink/api/table/plan/PlanTranslator.scala   |   3 +-
 .../api/table/plan/RexNodeTranslator.scala      |   3 +-
 .../flink/api/table/plan/TypeConverter.scala    | 203 ---------
 .../plan/nodes/dataset/DataSetAggregate.scala   |   4 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |   3 +-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   3 +-
 .../plan/nodes/dataset/DataSetSource.scala      |   3 +-
 .../api/table/plan/schema/DataSetTable.scala    |   2 +-
 .../table/runtime/aggregate/AggregateUtil.scala |   7 +-
 .../org/apache/flink/api/table/table.scala      |   3 +-
 .../apache/flink/api/table/trees/TreeNode.scala | 120 ------
 .../table/typeinfo/NullAwareComparator.scala    | 218 ----------
 .../api/table/typeinfo/NullMaskUtils.scala      |  98 -----
 .../api/table/typeinfo/RowComparator.scala      | 417 -------------------
 .../api/table/typeinfo/RowSerializer.scala      | 209 ----------
 .../flink/api/table/typeinfo/RowTypeInfo.scala  | 114 -----
 .../table/typeutils/NullAwareComparator.scala   | 218 ++++++++++
 .../api/table/typeutils/NullMaskUtils.scala     |  98 +++++
 .../api/table/typeutils/RowComparator.scala     | 417 +++++++++++++++++++
 .../api/table/typeutils/RowSerializer.scala     | 209 ++++++++++
 .../flink/api/table/typeutils/RowTypeInfo.scala | 114 +++++
 .../api/table/typeutils/TypeConverter.scala     | 196 +++++++++
 .../api/java/table/test/AggregationsITCase.java |   4 +-
 .../scala/table/test/AggregationsITCase.scala   |   2 +-
 .../api/scala/table/test/UnionITCase.scala      |   2 +-
 .../api/table/test/ScalarFunctionsTest.scala    |   5 +-
 .../api/table/typeinfo/RowComparatorTest.scala  | 134 ------
 .../api/table/typeinfo/RowSerializerTest.scala  | 194 ---------
 .../api/table/typeutils/RowComparatorTest.scala | 134 ++++++
 .../api/table/typeutils/RowSerializerTest.scala | 194 +++++++++
 46 files changed, 2036 insertions(+), 2199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index dbbe7e8..1f4e803 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -64,12 +64,6 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
     // get the planner for the plan
     val planner = lPlan.getCluster.getPlanner
 
-
-    println("-----------")
-    println("Input Plan:")
-    println("-----------")
-    println(RelOptUtil.toString(lPlan))
-
     // decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(lPlan)
 
@@ -92,11 +86,6 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
         throw a.getCause
     }
 
-    println("-------------")
-    println("DataSet Plan:")
-    println("-------------")
-    println(RelOptUtil.toString(dataSetPlan))
-
     dataSetPlan match {
       case node: DataSetRel =>
         node.translateToPlan(

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
deleted file mode 100644
index 51c0a4d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * Exception for all errors occurring during expression evaluation.
- */
-class ExpressionException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala
new file mode 100644
index 0000000..2d6fae6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * Exception for all errors occurring during expression evaluation.
+ */
+class ExpressionParserException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
index a8ad9ad..a24d74f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{TypeExtractor, PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.typeutils.RowTypeInfo
 
 object CodeGenUtils {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 5ebd4c3..de6860f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -32,8 +32,8 @@ import org.apache.flink.api.table.codegen.CodeGenUtils._
 import org.apache.flink.api.table.codegen.Indenter.toISC
 import org.apache.flink.api.table.codegen.calls.ScalarFunctions
 import org.apache.flink.api.table.codegen.calls.ScalarOperators._
-import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
+import TypeConverter.sqlTypeToTypeInfo
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
index 900ed8a..cd278d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
@@ -18,16 +18,10 @@
 package org.apache.flink.api.table.expressions
 
 import java.util.concurrent.atomic.AtomicInteger
-
 import scala.language.postfixOps
 
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
-import org.apache.flink.api.table.trees.TreeNode
-
-
 abstract class Expression extends TreeNode[Expression] { self: Product =>
   def name: String = Expression.freshName("expression")
-  def typeInfo: TypeInformation[_]
 }
 
 abstract class BinaryExpression() extends Expression { self: Product =>
@@ -46,7 +40,6 @@ abstract class LeafExpression() extends Expression { self: Product =>
 }
 
 case class NopExpression() extends LeafExpression {
-  val typeInfo = new NothingTypeInfo()
   override val name = Expression.freshName("nop")
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
new file mode 100644
index 0000000..4c88249
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.ExpressionParserException
+
+import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the constructs
+ * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL
+ * lazy valined in the above files.
+ */
+object ExpressionParser extends JavaTokenParsers with PackratParsers {
+  case class Keyword(key: String)
+
+  // Convert the keyword into an case insensitive Parser
+  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+    ("""(?i)\Q""" + kw.key + """\E""").r
+  }
+
+  // Keyword
+
+  lazy val AS: Keyword = Keyword("as")
+  lazy val COUNT: Keyword = Keyword("count")
+  lazy val AVG: Keyword = Keyword("avg")
+  lazy val MIN: Keyword = Keyword("min")
+  lazy val MAX: Keyword = Keyword("max")
+  lazy val SUM: Keyword = Keyword("sum")
+
+  // Literals
+
+  lazy val numberLiteral: PackratParser[Expression] =
+    ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | wholeNumber) ^^ {
+      str =>
+        if (str.endsWith("L") || str.endsWith("l")) {
+          Literal(str.toLong)
+        } else if (str.matches("""-?\d+""")) {
+          Literal(str.toInt)
+        } else if (str.endsWith("f") | str.endsWith("F")) {
+          Literal(str.toFloat)
+        } else {
+          Literal(str.toDouble)
+        }
+    }
+
+  lazy val singleQuoteStringLiteral: Parser[Expression] =
+    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ {
+      str => Literal(str.substring(1, str.length - 1))
+    }
+
+  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ {
+    str => Literal(str.substring(1, str.length - 1))
+  }
+
+  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
+    str => Literal(str.toBoolean)
+  }
+
+  lazy val literalExpr: PackratParser[Expression] =
+    numberLiteral |
+      stringLiteralFlink | singleQuoteStringLiteral |
+      boolLiteral
+
+  lazy val fieldReference: PackratParser[Expression] = ident ^^ {
+    case sym => UnresolvedFieldReference(sym)
+  }
+
+  lazy val atom: PackratParser[Expression] =
+    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
+
+  // suffix operators
+
+  lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) }
+  lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) }
+
+
+  lazy val sum: PackratParser[Expression] =
+    (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) })
+  lazy val min: PackratParser[Expression] =
+    (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) })
+  lazy val max: PackratParser[Expression] =
+    (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) })
+  lazy val count: PackratParser[Expression] =
+    (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) })
+  lazy val avg: PackratParser[Expression] =
+    (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) })
+
+  lazy val cast: PackratParser[Expression] =
+    atom <~ ".cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } |
+    atom <~ ".cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } |
+    atom <~ ".cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } |
+    atom <~ ".cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } |
+    atom <~ ".cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } |
+    atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) } |
+    atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
+    atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
+    atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } |
+    atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) }
+
+  lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ {
+    case e ~ _ ~ target ~ _ => Naming(e, target.name)
+  }
+
+  // general function calls
+
+  lazy val functionCall = ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ {
+    case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args: _*)
+  }
+
+  lazy val functionCallWithoutArgs = ident ~ "()" ^^ {
+    case name ~ _ => Call(name.toUpperCase)
+  }
+
+  lazy val suffixFunctionCall = atom ~ "." ~ ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ {
+    case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args : _*)
+  }
+
+  lazy val suffixFunctionCallWithoutArgs = atom ~ "." ~ ident ~ "()" ^^ {
+    case operand ~ _ ~ name ~ _ => Call(name.toUpperCase, operand)
+  }
+
+  // special calls
+
+  lazy val specialFunctionCalls = trim | trimWithoutArgs
+
+  lazy val specialSuffixFunctionCalls = suffixTrim | suffixTrimWithoutArgs
+
+  lazy val trimWithoutArgs = "trim(" ~ expression ~ ")" ^^ {
+    case _ ~ operand ~ _ =>
+      Call(
+        BuiltInFunctionNames.TRIM,
+        BuiltInFunctionConstants.TRIM_BOTH,
+        BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
+        operand)
+  }
+
+  lazy val suffixTrimWithoutArgs = atom ~ ".trim()" ^^ {
+    case operand ~ _ =>
+      Call(
+        BuiltInFunctionNames.TRIM,
+        BuiltInFunctionConstants.TRIM_BOTH,
+        BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
+        operand)
+  }
+
+  lazy val trim = "trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ expression ~
+      "," ~ expression ~ ")" ^^ {
+    case _ ~ trimType ~ _ ~ trimCharacter ~ _ ~ operand ~ _ =>
+      val flag = trimType match {
+        case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
+        case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
+        case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
+      }
+      Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
+  }
+
+  lazy val suffixTrim = atom ~ ".trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~
+      expression ~ ")" ^^ {
+    case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ =>
+      val flag = trimType match {
+        case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
+        case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
+        case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
+      }
+      Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
+  }
+
+  lazy val suffix =
+    isNull | isNotNull |
+      sum | min | max | count | avg | cast |
+      specialFunctionCalls |functionCall | functionCallWithoutArgs |
+      specialSuffixFunctionCalls | suffixFunctionCall | suffixFunctionCallWithoutArgs |
+      atom
+
+  // unary ops
+
+  lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => Not(e) }
+
+  lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => UnaryMinus(e) }
+
+  lazy val unary = unaryNot | unaryMinus | suffix
+
+  // arithmetic
+
+  lazy val product = unary * (
+    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
+      "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
+      "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } )
+
+  lazy val term = product * (
+    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
+     "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } )
+
+  // Comparison
+
+  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "=") ~ term ^^ {
+    case l ~ _ ~ r => EqualTo(l, r)
+  }
+
+  lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ {
+    case l ~ _ ~ r => NotEqualTo(l, r)
+  }
+
+  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
+    case l ~ _ ~ r => GreaterThan(l, r)
+  }
+
+  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ {
+    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
+  }
+
+  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
+    case l ~ _ ~ r => LessThan(l, r)
+  }
+
+  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
+    case l ~ _ ~ r => LessThanOrEqual(l, r)
+  }
+
+  lazy val comparison: PackratParser[Expression] =
+      equalTo | notEqualTo |
+      greaterThan | greaterThanOrEqual |
+      lessThan | lessThanOrEqual | term
+
+  // logic
+
+  lazy val logic = comparison * (
+    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
+      "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } )
+
+  // alias
+
+  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+    case e ~ _ ~ name => Naming(e, name.name)
+  } | logic
+
+  lazy val expression: PackratParser[Expression] = alias
+
+  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
+
+  def parseExpressionList(expression: String): List[Expression] = {
+    parseAll(expressionList, expression) match {
+      case Success(lst, _) => lst
+
+      case Failure(msg, _) => throw new ExpressionParserException(
+        "Could not parse expression: " + msg)
+
+      case Error(msg, _) => throw new ExpressionParserException(
+        "Could not parse expression: " + msg)
+    }
+  }
+
+  def parseExpression(exprString: String): Expression = {
+    parseAll(expression, exprString) match {
+      case Success(lst, _) => lst
+
+      case fail =>
+        throw new ExpressionParserException("Could not parse expression: " + fail.toString)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala
new file mode 100644
index 0000000..9d4ca80
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+/**
+ * Generic base class for trees that can be transformed and traversed.
+ */
+abstract class TreeNode[A <: TreeNode[A]] { self: A with Product =>
+
+  /**
+   * List of child nodes that should be considered when doing transformations. Other values
+   * in the Product will not be transformed, only handed through.
+   */
+  def children: Seq[A]
+
+  /**
+   * Tests for equality by first testing for reference equality.
+   */
+  def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
+
+  def transformPre(rule: PartialFunction[A, A]): A = {
+    val afterTransform = rule.applyOrElse(this, identity[A])
+
+    if (afterTransform fastEquals this) {
+      this.transformChildrenPre(rule)
+    } else {
+      afterTransform.transformChildrenPre(rule)
+    }
+  }
+
+  def transformChildrenPre(rule: PartialFunction[A, A]): A = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: A if children.contains(child) =>
+        val newChild = child.transformPre(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def transformPost(rule: PartialFunction[A, A]): A = {
+    val afterChildren = transformChildrenPost(rule)
+    if (afterChildren fastEquals this) {
+      rule.applyOrElse(this, identity[A])
+    } else {
+      rule.applyOrElse(afterChildren, identity[A])
+    }
+  }
+
+  def transformChildrenPost(rule: PartialFunction[A, A]): A = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: A if children.contains(child) =>
+        val newChild = child.transformPost(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+    // toArray forces evaluation, toSeq does not seem to work here
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def exists(predicate: A => Boolean): Boolean = {
+    var exists = false
+    this.transformPre {
+      case e: A => if (predicate(e)) {
+        exists = true
+      }
+        e
+    }
+    exists
+  }
+
+  /**
+   * Creates a new copy of this expression with new children. This is used during transformation
+   * if children change. This must be overridden by tree nodes that don't have the Constructor
+   * arguments in the same order as the `children`.
+   */
+  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
+    val defaultCtor =
+      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
+    try {
+      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
+    } catch {
+      case iae: IllegalArgumentException =>
+        println("IAE " + this)
+        throw new RuntimeException("Should never happen.")
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
index d2fbdff..d9d5fa8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
@@ -17,26 +17,7 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.ExpressionException
-
-
 abstract sealed class Aggregation extends UnaryExpression { self: Product =>
-  def typeInfo = {
-    child.typeInfo match {
-      case BasicTypeInfo.LONG_TYPE_INFO => // ok
-      case BasicTypeInfo.INT_TYPE_INFO =>
-      case BasicTypeInfo.DOUBLE_TYPE_INFO =>
-      case BasicTypeInfo.FLOAT_TYPE_INFO =>
-      case BasicTypeInfo.BYTE_TYPE_INFO =>
-      case BasicTypeInfo.SHORT_TYPE_INFO =>
-      case _ =>
-      throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " +
-        s"aggregation $this. Only numeric data types supported.")
-    }
-    child.typeInfo
-  }
-
   override def toString = s"Aggregate($child)"
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
index 440d954..b0bfa86 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -17,56 +17,13 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
-import org.apache.flink.api.table.ExpressionException
-
-abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
-  def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    left.typeInfo
-  }
-}
+abstract class BinaryArithmetic extends BinaryExpression { self: Product => }
 
 case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
-      !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
-      throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
-      !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
-      throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    left.typeInfo
-  }
-
   override def toString = s"($left + $right)"
 }
 
 case class UnaryMinus(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
-    }
-    child.typeInfo
-  }
-
   override def toString = s"-($child)"
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
index 8f0b010..9f74414 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
@@ -23,8 +23,6 @@ package org.apache.flink.api.table.expressions
   */
 case class Call(functionName: String, args: Expression*) extends Expression {
 
-  def typeInfo = ???
-
   override def children: Seq[Expression] = args
 
   override def toString = s"\\$functionName(${args.mkString(", ")})"

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
index 8918234..eb97d04 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -17,18 +17,9 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.TypeInformation
 
 case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
-  def typeInfo = tpe match {
-    case BasicTypeInfo.STRING_TYPE_INFO => tpe
-
-    case b if b.isBasicType && child.typeInfo.isBasicType => tpe
-
-    case _ => throw new ExpressionException(
-      s"Invalid cast: $this. Casts are only valid betwixt primitive types.")
-  }
 
   override def toString = s"$child.cast($tpe)"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index 687ea7a..d9e9198 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -17,46 +17,13 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
-
-abstract class BinaryComparison extends BinaryExpression { self: Product =>
-  def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(s"Non-numeric operand ${left} in $this")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(s"Non-numeric operand ${right} in $this")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-}
+abstract class BinaryComparison extends BinaryExpression { self: Product => }
 
 case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def typeInfo = {
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
   override def toString = s"$left === $right"
 }
 
 case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def typeInfo = {
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
   override def toString = s"$left !== $right"
 }
 
@@ -77,17 +44,9 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryCo
 }
 
 case class IsNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
   override def toString = s"($child).isNull"
 }
 
 case class IsNotNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
   override def toString = s"($child).isNotNull"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
index d7195b4..f3cb77e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -17,26 +17,17 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
 case class UnresolvedFieldReference(override val name: String) extends LeafExpression {
-  def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this")
-
   override def toString = "\"" + name
 }
 
 case class ResolvedFieldReference(
-    override val name: String,
-    tpe: TypeInformation[_]) extends LeafExpression {
-  def typeInfo = tpe
+    override val name: String) extends LeafExpression {
 
   override def toString = s"'$name"
 }
 
 case class Naming(child: Expression, override val name: String) extends UnaryExpression {
-  def typeInfo = child.typeInfo
-
   override def toString = s"$child as '$name"
 
   override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
index eaf0463..3f9b5c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
@@ -17,27 +17,9 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-abstract class BinaryPredicate extends BinaryExpression { self: Product =>
-  def typeInfo = {
-    if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
-      right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-}
+abstract class BinaryPredicate extends BinaryExpression { self: Product => }
 
 case class Not(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
 
   override val name = Expression.freshName("not-" + child.name)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
deleted file mode 100644
index 7341f27..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.parser
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.table.expressions._
-
-import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers}
-
-/**
- * Parser for expressions inside a String. This parses exactly the same expressions that
- * would be accepted by the Scala Expression DSL.
- *
- * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the constructs
- * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL
- * lazy valined in the above files.
- */
-object ExpressionParser extends JavaTokenParsers with PackratParsers {
-  case class Keyword(key: String)
-
-  // Convert the keyword into an case insensitive Parser
-  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
-    ("""(?i)\Q""" + kw.key + """\E""").r
-  }
-
-  // Keyword
-
-  lazy val AS: Keyword = Keyword("as")
-  lazy val COUNT: Keyword = Keyword("count")
-  lazy val AVG: Keyword = Keyword("avg")
-  lazy val MIN: Keyword = Keyword("min")
-  lazy val MAX: Keyword = Keyword("max")
-  lazy val SUM: Keyword = Keyword("sum")
-
-  // Literals
-
-  lazy val numberLiteral: PackratParser[Expression] =
-    ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | wholeNumber) ^^ {
-      str =>
-        if (str.endsWith("L") || str.endsWith("l")) {
-          Literal(str.toLong)
-        } else if (str.matches("""-?\d+""")) {
-          Literal(str.toInt)
-        } else if (str.endsWith("f") | str.endsWith("F")) {
-          Literal(str.toFloat)
-        } else {
-          Literal(str.toDouble)
-        }
-    }
-
-  lazy val singleQuoteStringLiteral: Parser[Expression] =
-    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ {
-      str => Literal(str.substring(1, str.length - 1))
-    }
-
-  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ {
-    str => Literal(str.substring(1, str.length - 1))
-  }
-
-  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
-    str => Literal(str.toBoolean)
-  }
-
-  lazy val literalExpr: PackratParser[Expression] =
-    numberLiteral |
-      stringLiteralFlink | singleQuoteStringLiteral |
-      boolLiteral
-
-  lazy val fieldReference: PackratParser[Expression] = ident ^^ {
-    case sym => UnresolvedFieldReference(sym)
-  }
-
-  lazy val atom: PackratParser[Expression] =
-    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
-
-  // suffix operators
-
-  lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) }
-  lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) }
-
-
-  lazy val sum: PackratParser[Expression] =
-    (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) })
-  lazy val min: PackratParser[Expression] =
-    (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) })
-  lazy val max: PackratParser[Expression] =
-    (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) })
-  lazy val count: PackratParser[Expression] =
-    (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) })
-  lazy val avg: PackratParser[Expression] =
-    (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) })
-
-  lazy val cast: PackratParser[Expression] =
-    atom <~ ".cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } |
-    atom <~ ".cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } |
-    atom <~ ".cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } |
-    atom <~ ".cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } |
-    atom <~ ".cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } |
-    atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) } |
-    atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
-    atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
-    atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } |
-    atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) }
-
-  lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ {
-    case e ~ _ ~ target ~ _ => Naming(e, target.name)
-  }
-
-  // general function calls
-
-  lazy val functionCall = ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ {
-    case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args: _*)
-  }
-
-  lazy val functionCallWithoutArgs = ident ~ "()" ^^ {
-    case name ~ _ => Call(name.toUpperCase)
-  }
-
-  lazy val suffixFunctionCall = atom ~ "." ~ ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ {
-    case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args : _*)
-  }
-
-  lazy val suffixFunctionCallWithoutArgs = atom ~ "." ~ ident ~ "()" ^^ {
-    case operand ~ _ ~ name ~ _ => Call(name.toUpperCase, operand)
-  }
-
-  // special calls
-
-  lazy val specialFunctionCalls = trim | trimWithoutArgs
-
-  lazy val specialSuffixFunctionCalls = suffixTrim | suffixTrimWithoutArgs
-
-  lazy val trimWithoutArgs = "trim(" ~ expression ~ ")" ^^ {
-    case _ ~ operand ~ _ =>
-      Call(
-        BuiltInFunctionNames.TRIM,
-        BuiltInFunctionConstants.TRIM_BOTH,
-        BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
-        operand)
-  }
-
-  lazy val suffixTrimWithoutArgs = atom ~ ".trim()" ^^ {
-    case operand ~ _ =>
-      Call(
-        BuiltInFunctionNames.TRIM,
-        BuiltInFunctionConstants.TRIM_BOTH,
-        BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
-        operand)
-  }
-
-  lazy val trim = "trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ expression ~
-      "," ~ expression ~ ")" ^^ {
-    case _ ~ trimType ~ _ ~ trimCharacter ~ _ ~ operand ~ _ =>
-      val flag = trimType match {
-        case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
-        case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
-        case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
-      }
-      Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
-  }
-
-  lazy val suffixTrim = atom ~ ".trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~
-      expression ~ ")" ^^ {
-    case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ =>
-      val flag = trimType match {
-        case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
-        case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
-        case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
-      }
-      Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
-  }
-
-  lazy val suffix =
-    isNull | isNotNull |
-      sum | min | max | count | avg | cast |
-      specialFunctionCalls |functionCall | functionCallWithoutArgs |
-      specialSuffixFunctionCalls | suffixFunctionCall | suffixFunctionCallWithoutArgs |
-      atom
-
-  // unary ops
-
-  lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => Not(e) }
-
-  lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => UnaryMinus(e) }
-
-  lazy val unary = unaryNot | unaryMinus | suffix
-
-  // arithmetic
-
-  lazy val product = unary * (
-    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
-      "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
-      "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } )
-
-  lazy val term = product * (
-    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
-     "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } )
-
-  // Comparison
-
-  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "=") ~ term ^^ {
-    case l ~ _ ~ r => EqualTo(l, r)
-  }
-
-  lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ {
-    case l ~ _ ~ r => NotEqualTo(l, r)
-  }
-
-  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
-    case l ~ _ ~ r => GreaterThan(l, r)
-  }
-
-  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ {
-    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
-  }
-
-  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
-    case l ~ _ ~ r => LessThan(l, r)
-  }
-
-  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
-    case l ~ _ ~ r => LessThanOrEqual(l, r)
-  }
-
-  lazy val comparison: PackratParser[Expression] =
-      equalTo | notEqualTo |
-      greaterThan | greaterThanOrEqual |
-      lessThan | lessThanOrEqual | term
-
-  // logic
-
-  lazy val logic = comparison * (
-    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
-      "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } )
-
-  // alias
-
-  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
-    case e ~ _ ~ name => Naming(e, name.name)
-  } | logic
-
-  lazy val expression: PackratParser[Expression] = alias
-
-  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
-
-  def parseExpressionList(expression: String): List[Expression] = {
-    parseAll(expressionList, expression) match {
-      case Success(lst, _) => lst
-
-      case Failure(msg, _) => throw new ExpressionException("Could not parse expression: " + msg)
-
-      case Error(msg, _) => throw new ExpressionException("Could not parse expression: " + msg)
-    }
-  }
-
-  def parseExpression(exprString: String): Expression = {
-    parseAll(expression, exprString) match {
-      case Success(lst, _) => lst
-
-      case fail =>
-        throw new ExpressionException("Could not parse expression: " + fail.toString)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
index af22768..f443155 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -22,8 +22,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.parser.ExpressionParser
-import org.apache.flink.api.table.expressions.{Naming, Expression, UnresolvedFieldReference}
+import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, Expression, UnresolvedFieldReference}
 import org.apache.flink.api.table.Table
 
 import scala.language.reflectiveCalls

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index 976f2f1..1668efb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.AggCall
 import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
 
 import scala.collection.JavaConversions._
 
@@ -76,7 +77,7 @@ object RexNodeTranslator {
       // Basic operators
       case Literal(value, tpe) =>
         relBuilder.literal(value)
-      case ResolvedFieldReference(name, tpe) =>
+      case ResolvedFieldReference(name) =>
         relBuilder.field(name)
       case UnresolvedFieldReference(name) =>
         relBuilder.field(name)

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
deleted file mode 100644
index 030d577..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.core.JoinRelType._
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.ValueTypeInfo._
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{Row, TableException}
-
-import scala.collection.JavaConversions._
-
-object TypeConverter {
-
-  val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq(), Seq()).asInstanceOf[TypeInformation[Any]]
-
-  def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
-    case BOOLEAN_TYPE_INFO => BOOLEAN
-    case BOOLEAN_VALUE_TYPE_INFO => BOOLEAN
-    case BYTE_TYPE_INFO => TINYINT
-    case BYTE_VALUE_TYPE_INFO => TINYINT
-    case SHORT_TYPE_INFO => SMALLINT
-    case SHORT_VALUE_TYPE_INFO => SMALLINT
-    case INT_TYPE_INFO => INTEGER
-    case INT_VALUE_TYPE_INFO => INTEGER
-    case LONG_TYPE_INFO => BIGINT
-    case LONG_VALUE_TYPE_INFO => BIGINT
-    case FLOAT_TYPE_INFO => FLOAT
-    case FLOAT_VALUE_TYPE_INFO => FLOAT
-    case DOUBLE_TYPE_INFO => DOUBLE
-    case DOUBLE_VALUE_TYPE_INFO => DOUBLE
-    case STRING_TYPE_INFO => VARCHAR
-    case STRING_VALUE_TYPE_INFO => VARCHAR
-    case DATE_TYPE_INFO => DATE
-
-    case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
-      throw new TableException("Character type is not supported.")
-
-//    case t: TupleTypeInfo[_] => ROW
-//    case c: CaseClassTypeInfo[_] => ROW
-//    case p: PojoTypeInfo[_] => STRUCTURED
-//    case g: GenericTypeInfo[_] => OTHER
-
-    case t@_ =>
-      throw new TableException(s"Type is not supported: $t")
-  }
-
-  def sqlTypeToTypeInfo(sqlType: SqlTypeName): TypeInformation[_] = sqlType match {
-    case BOOLEAN => BOOLEAN_TYPE_INFO
-    case TINYINT => BYTE_TYPE_INFO
-    case SMALLINT => SHORT_TYPE_INFO
-    case INTEGER => INT_TYPE_INFO
-    case BIGINT => LONG_TYPE_INFO
-    case FLOAT => FLOAT_TYPE_INFO
-    case DOUBLE => DOUBLE_TYPE_INFO
-    case VARCHAR | CHAR => STRING_TYPE_INFO
-    case DATE => DATE_TYPE_INFO
-
-    // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-    // are represented as integer
-    case SYMBOL => INT_TYPE_INFO
-
-    case _ =>
-      println(sqlType)
-      ??? // TODO more types
-  }
-
-  /**
-    * Determines the return type of Flink operators based on the logical fields, the expected
-    * physical type and configuration parameters.
-    *
-    * For example:
-    *   - No physical type expected, only 3 non-null fields and efficient type usage enabled
-    *       -> return Tuple3
-    *   - No physical type expected, efficient type usage enabled, but 3 nullable fields
-    *       -> return Row because Tuple does not support null values
-    *   - Physical type expected
-    *       -> check if physical type is compatible and return it
-    *
-    * @param logicalRowType logical row information
-    * @param expectedPhysicalType expected physical type
-    * @param nullable fields can be nullable
-    * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
-    * @return suitable return type
-    */
-  def determineReturnType(
-      logicalRowType: RelDataType,
-      expectedPhysicalType: Option[TypeInformation[Any]],
-      nullable: Boolean,
-      useEfficientTypes: Boolean)
-    : TypeInformation[Any] = {
-    // convert to type information
-    val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
-      TypeConverter.sqlTypeToTypeInfo(relDataType.getType.getSqlTypeName)
-    }
-    // field names
-    val logicalFieldNames = logicalRowType.getFieldNames
-
-    val returnType = expectedPhysicalType match {
-      // a certain physical type is expected (but not Row)
-      // check if expected physical type is compatible with logical field type
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        if (typeInfo.getArity != logicalFieldTypes.length) {
-          throw new TableException("Arity of result does not match expected type.")
-        }
-        typeInfo match {
-
-          // POJO type expected
-          case pt: PojoTypeInfo[_] =>
-            logicalFieldNames.zip(logicalFieldTypes) foreach {
-              case (fName, fType) =>
-                val pojoIdx = pt.getFieldIndex(fName)
-                if (pojoIdx < 0) {
-                  throw new TableException(s"POJO does not define field name: $fName")
-                }
-                val expectedTypeInfo = pt.getTypeAt(pojoIdx)
-                if (fType != expectedTypeInfo) {
-                  throw new TableException(s"Result field does not match expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fType")
-                }
-            }
-
-          // Tuple/Case class type expected
-          case ct: CompositeType[_] =>
-            logicalFieldTypes.zipWithIndex foreach {
-              case (fieldTypeInfo, i) =>
-                val expectedTypeInfo = ct.getTypeAt(i)
-                if (fieldTypeInfo != expectedTypeInfo) {
-                  throw new TableException(s"Result field does not match expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
-                }
-            }
-
-          // Atomic type expected
-          case at: AtomicType[_] =>
-            val fieldTypeInfo = logicalFieldTypes.head
-            if (fieldTypeInfo != at) {
-              throw new TableException(s"Result field does not match expected type. " +
-                s"Expected: $at; Actual: $fieldTypeInfo")
-            }
-
-          case _ =>
-            throw new TableException("Unsupported result type.")
-        }
-        typeInfo
-
-      // Row is expected, create the arity for it
-      case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
-        new RowTypeInfo(logicalFieldTypes, logicalFieldNames)
-
-      // no physical type
-      // determine type based on logical fields and configuration parameters
-      case None =>
-        // no need for efficient types -> use Row
-        // we cannot use efficient types if row arity > tuple arity or nullable
-        if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
-          new RowTypeInfo(logicalFieldTypes, logicalFieldNames)
-        }
-        // use efficient type tuple or atomic type
-        else {
-          if (logicalFieldTypes.length == 1) {
-            logicalFieldTypes.head
-          }
-          else {
-            new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
-          }
-        }
-    }
-    returnType.asInstanceOf[TypeInformation[Any]]
-  }
-
-  def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
-    case INNER => JoinType.INNER
-    case LEFT => JoinType.LEFT_OUTER
-    case RIGHT => JoinType.RIGHT_OUTER
-    case FULL => JoinType.FULL_OUTER
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index d9a0a93..ce60621 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -24,10 +24,10 @@ import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter}
+import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
 import org.apache.flink.api.table.{Row, TableConfig}
 
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index 8cb901c..8f2dc87 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -25,7 +25,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.TypeConverter._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import TypeConverter._
 import org.apache.flink.api.table.runtime.FlatMapRunner
 import org.apache.flink.api.table.TableConfig
 import org.apache.calcite.rex._

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index ffb3e1b..4a2b375 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -29,9 +29,10 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.runtime.FlatJoinRunner
+import org.apache.flink.api.table.typeutils.TypeConverter
 import org.apache.flink.api.table.{TableException, TableConfig}
 import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.flink.api.table.plan.TypeConverter._
+import TypeConverter.determineReturnType
 import scala.collection.mutable.ArrayBuffer
 import org.apache.calcite.rex.RexNode
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
index a3801c9..18537ff 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
@@ -28,7 +28,8 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.PojoTypeInfo
 import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.TypeConverter.determineReturnType
+import org.apache.flink.api.table.typeutils.TypeConverter
+import TypeConverter.determineReturnType
 import org.apache.flink.api.table.plan.schema.DataSetTable
 import org.apache.flink.api.table.runtime.MapRunner
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
index 75090a2..b33f66d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataSetTable.scala
@@ -31,7 +31,7 @@ import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.api.common.typeinfo.AtomicType
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.TypeConverter
+import org.apache.flink.api.table.typeutils.TypeConverter
 
 class DataSetTable[T](
     val dataSet: DataSet[T],

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 70d0497..77e896f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -26,9 +26,10 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
 import org.apache.calcite.sql.fun._
 import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
-import org.apache.flink.api.table.plan.{TypeConverter, PlanGenException}
-import org.apache.flink.api.table.plan.TypeConverter._
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.plan.PlanGenException
+import org.apache.flink.api.table.typeutils.{TypeConverter, RowTypeInfo}
+import TypeConverter._
+import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.apache.flink.api.table.{Row, TableConfig}
 
 import scala.collection.JavaConversions._

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 53c63cb..709af62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -31,8 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.plan.{PlanGenException, RexNodeTranslator}
 import RexNodeTranslator.{toRexNode, extractAggCalls}
-import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression}
-import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, UnresolvedFieldReference, Expression}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
deleted file mode 100644
index 84f1d7e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.trees
-
-/**
- * Generic base class for trees that can be transformed and traversed.
- */
-abstract class TreeNode[A <: TreeNode[A]] { self: A with Product =>
-
-  /**
-   * List of child nodes that should be considered when doing transformations. Other values
-   * in the Product will not be transformed, only handed through.
-   */
-  def children: Seq[A]
-
-  /**
-   * Tests for equality by first testing for reference equality.
-   */
-  def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
-
-  def transformPre(rule: PartialFunction[A, A]): A = {
-    val afterTransform = rule.applyOrElse(this, identity[A])
-
-    if (afterTransform fastEquals this) {
-      this.transformChildrenPre(rule)
-    } else {
-      afterTransform.transformChildrenPre(rule)
-    }
-  }
-
-  def transformChildrenPre(rule: PartialFunction[A, A]): A = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: A if children.contains(child) =>
-        val newChild = child.transformPre(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def transformPost(rule: PartialFunction[A, A]): A = {
-    val afterChildren = transformChildrenPost(rule)
-    if (afterChildren fastEquals this) {
-      rule.applyOrElse(this, identity[A])
-    } else {
-      rule.applyOrElse(afterChildren, identity[A])
-    }
-  }
-
-  def transformChildrenPost(rule: PartialFunction[A, A]): A = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: A if children.contains(child) =>
-        val newChild = child.transformPost(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-    // toArray forces evaluation, toSeq does not seem to work here
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def exists(predicate: A => Boolean): Boolean = {
-    var exists = false
-    this.transformPre {
-      case e: A => if (predicate(e)) {
-        exists = true
-      }
-        e
-    }
-    exists
-  }
-
-  /**
-   * Creates a new copy of this expression with new children. This is used during transformation
-   * if children change. This must be overridden by tree nodes that don't have the Constructor
-   * arguments in the same order as the `children`.
-   */
-  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-    val defaultCtor =
-      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
-    try {
-      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
-    } catch {
-      case iae: IllegalArgumentException =>
-        println("IAE " + this)
-        throw new RuntimeException("Should never happen.")
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullAwareComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullAwareComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullAwareComparator.scala
deleted file mode 100644
index ebbd471..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullAwareComparator.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Null-aware comparator that wraps a comparator which does not support null references.
- *
- * NOTE: This class assumes to be used within a composite type comparator (such
- * as [[RowComparator]]) that handles serialized comparison.
- */
-class NullAwareComparator[T](
-    val wrappedComparator: TypeComparator[T],
-    val order: Boolean)
-  extends TypeComparator[T] {
-
-  // number of flat fields
-  private val flatFields = wrappedComparator.getFlatComparators.length
-
-  // stores the null for reference comparison
-  private var nullReference = false
-
-  override def hash(record: T): Int = {
-    if (record != null) {
-      wrappedComparator.hash(record)
-    }
-    else {
-      0
-    }
-  }
-
- override def getNormalizeKeyLen: Int = {
-    val len = wrappedComparator.getNormalizeKeyLen
-    if (len == Integer.MAX_VALUE) {
-      Integer.MAX_VALUE
-    }
-    else {
-      len + 1 // add one for a null byte
-    }
-  }
-
-  override def putNormalizedKey(
-      record: T,
-      target: MemorySegment,
-      offset: Int,
-      numBytes: Int)
-    : Unit = {
-    if (numBytes > 0) {
-      // write a null byte with padding
-      if (record == null) {
-        target.putBoolean(offset, false)
-        // write padding
-        var j = 0
-        while (j < numBytes - 1) {
-          target.put(offset + 1 + j, 0.toByte)
-          j += 1
-        }
-      }
-      // write a non-null byte with key
-      else {
-        target.putBoolean(offset, true)
-        // write key
-        wrappedComparator.putNormalizedKey(record, target, offset + 1, numBytes - 1)
-      }
-    }
-  }
-
-  override def invertNormalizedKey(): Boolean = wrappedComparator.invertNormalizedKey()
-
-  override def supportsSerializationWithKeyNormalization(): Boolean = false
-
-  override def writeWithKeyNormalization(record: T, target: DataOutputView): Unit =
-    throw new UnsupportedOperationException("Record serialization with leading normalized keys" +
-      " not supported.")
-
-  override def readWithKeyDenormalization(reuse: T, source: DataInputView): T =
-    throw new UnsupportedOperationException("Record deserialization with leading normalized keys" +
-      " not supported.")
-
-  override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
-    wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes - 1)
-
-  override def setReference(toCompare: T): Unit = {
-    if (toCompare == null) {
-      nullReference = true
-    }
-    else {
-      nullReference = false
-      wrappedComparator.setReference(toCompare)
-    }
-  }
-
-  override def compare(first: T, second: T): Int = {
-    // both values are null -> equality
-    if (first == null && second == null) {
-      0
-    }
-    // first value is null -> inequality
-    // but order is considered
-    else if (first == null) {
-      if (order) -1 else 1
-    }
-    // second value is null -> inequality
-    // but order is considered
-    else if (second == null) {
-      if (order) 1 else -1
-    }
-    // no null values
-    else {
-      wrappedComparator.compare(first, second)
-    }
-  }
-
-  override def compareToReference(referencedComparator: TypeComparator[T]): Int = {
-    val otherComparator = referencedComparator.asInstanceOf[NullAwareComparator[T]]
-    val otherNullReference = otherComparator.nullReference
-    // both values are null -> equality
-    if (nullReference && otherNullReference) {
-      0
-    }
-    // first value is null -> inequality
-    // but order is considered
-    else if (nullReference) {
-      if (order) 1 else -1
-    }
-    // second value is null -> inequality
-    // but order is considered
-    else if (otherNullReference) {
-      if (order) -1 else 1
-    }
-    // no null values
-    else {
-      wrappedComparator.compareToReference(otherComparator.wrappedComparator)
-    }
-  }
-
-  override def supportsNormalizedKey(): Boolean = wrappedComparator.supportsNormalizedKey()
-
-  override def equalToReference(candidate: T): Boolean = {
-    // both values are null
-    if (candidate == null && nullReference) {
-      true
-    }
-    // one value is null
-    else if (candidate == null || nullReference) {
-      false
-    }
-    // no null value
-    else {
-      wrappedComparator.equalToReference(candidate)
-    }
-  }
-
-  override def duplicate(): TypeComparator[T] = {
-    new NullAwareComparator[T](wrappedComparator.duplicate(), order)
-  }
-
-  override def extractKeys(record: Any, target: Array[AnyRef], index: Int): Int = {
-    if (record == null) {
-      var i = 0
-      while (i < flatFields) {
-        target(index + i) = null
-        i += 1
-      }
-      flatFields
-    }
-    else {
-      wrappedComparator.extractKeys(record, target, index)
-    }
-  }
-
-
-  override def getFlatComparators: Array[TypeComparator[_]] = {
-    // determine the flat comparators and wrap them again in null-aware comparators
-    val flatComparators = new ArrayBuffer[TypeComparator[_]]()
-    wrappedComparator match {
-      case ctc: CompositeTypeComparator[_] => ctc.getFlatComparator(flatComparators)
-      case c: TypeComparator[_] => flatComparators += c
-    }
-    val wrappedComparators = flatComparators.map { c =>
-      new NullAwareComparator[Any](c.asInstanceOf[TypeComparator[Any]], order)
-    }
-    wrappedComparators.toArray[TypeComparator[_]]
-  }
-
-  /**
-   * This method is not implemented here. It must be implemented by the comparator this class
-   * is contained in (e.g. RowComparator).
-   *
-   * @param firstSource The input view containing the first record.
-   * @param secondSource The input view containing the second record.
-   * @return An integer defining the oder among the objects in the same way as
-   *         { @link java.util.Comparator#compare(Object, Object)}.
-   */
-  override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int =
-    throw new UnsupportedOperationException("Comparator does not support null-aware serialized " +
-      "comparision.")
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c6dad4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullMaskUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullMaskUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullMaskUtils.scala
deleted file mode 100644
index 40a39dd..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullMaskUtils.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.table.Row
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
-object NullMaskUtils {
-
-  def writeNullMask(len: Int, value: Row, target: DataOutputView): Unit = {
-    var b = 0x00
-    var bytePos = 0
-
-    var fieldPos = 0
-    var numPos = 0
-    while (fieldPos < len) {
-      b = 0x00
-      // set bits in byte
-      bytePos = 0
-      numPos = Math.min(8, len - fieldPos)
-      while (bytePos < numPos) {
-        b = b << 1
-        // set bit if field is null
-        if(value.productElement(fieldPos + bytePos) == null) {
-          b |= 0x01
-        }
-        bytePos += 1
-      }
-      fieldPos += numPos
-      // shift bits if last byte is not completely filled
-      b <<= (8 - bytePos)
-      // write byte
-      target.writeByte(b)
-    }
-  }
-
-  def readIntoNullMask(len: Int, source: DataInputView, nullMask: Array[Boolean]): Unit = {
-    var b = 0x00
-    var bytePos = 0
-
-    var fieldPos = 0
-    var numPos = 0
-    while (fieldPos < len) {
-      // read byte
-      b = source.readUnsignedByte()
-      bytePos = 0
-      numPos = Math.min(8, len - fieldPos)
-      while (bytePos < numPos) {
-        nullMask(fieldPos + bytePos) = (b & 0x80) > 0
-        b = b << 1
-        bytePos += 1
-      }
-      fieldPos += numPos
-    }
-  }
-
-  def readIntoAndCopyNullMask(
-      len: Int,
-      source: DataInputView,
-      target: DataOutputView,
-      nullMask: Array[Boolean]): Unit = {
-    var b = 0x00
-    var bytePos = 0
-
-    var fieldPos = 0
-    var numPos = 0
-    while (fieldPos < len) {
-      // read byte
-      b = source.readUnsignedByte()
-      // copy byte
-      target.writeByte(b)
-      bytePos = 0
-      numPos = Math.min(8, len - fieldPos)
-      while (bytePos < numPos) {
-        nullMask(fieldPos + bytePos) = (b & 0x80) > 0
-        b = b << 1
-        bytePos += 1
-      }
-      fieldPos += numPos
-    }
-  }
-
-}


Mime
View raw message