flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser
Date Wed, 17 May 2017 12:19:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master a2580171d -> 9244106b3


[FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser

This closes #3923.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9244106b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9244106b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9244106b

Branch: refs/heads/master
Commit: 9244106b334ef54ba3e39a3f2c0c76f46ae4ecd3
Parents: a258017
Author: twalthr <twalthr@apache.org>
Authored: Mon May 15 15:27:10 2017 +0200
Committer: twalthr <twalthr@apache.org>
Committed: Wed May 17 14:15:07 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  24 +-
 .../org/apache/flink/table/api/table.scala      |  16 +-
 .../apache/flink/table/codegen/Compiler.scala   |   4 +-
 .../table/expressions/ExpressionParser.scala    | 397 ++++++++-----------
 .../flink/table/plan/ProjectionTranslator.scala |   2 +-
 .../flink/table/validate/FunctionCatalog.scala  |  26 +-
 .../CastingStringExpressionTest.scala           |   6 +-
 .../table/expressions/DecimalTypeTest.scala     |   4 +-
 .../table/expressions/ScalarFunctionsTest.scala |   6 +-
 .../table/expressions/TemporalTypesTest.scala   |  32 +-
 .../UserDefinedScalarFunctionTest.scala         |  31 +-
 .../plan/util/RexProgramExtractorTest.scala     |   6 +-
 12 files changed, 251 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index d105188..6a5ceee 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1038,9 +1038,9 @@ This is the EBNF grammar for expressions:
 
 expressionList = expression , { "," , expression } ;
 
-expression = alias ;
+expression = timeIndicator | overConstant | alias ;
 
-alias = logic | ( logic , "AS" , fieldReference ) ;
+alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
 
 logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
 
@@ -1052,9 +1052,11 @@ product = unary , [ ( "*" | "/" | "%") , unary ] ;
 
 unary = [ "!" | "-" ] , composite ;
 
-composite = suffixed | atom ;
+composite = over | nullLiteral | suffixed | atom ;
 
-suffixed = interval | cast | as | aggregation | if | functionCall ;
+suffixed = interval | cast | as | if | functionCall ;
+
+interval = timeInterval | rowInterval ;
 
 timeInterval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
 
@@ -1062,17 +1064,15 @@ rowInterval = composite , "." , "rows" ;
 
 cast = composite , ".cast(" , dataType , ")" ;
 
-dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" ;
+dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
 
 as = composite , ".as(" , fieldReference , ")" ;
 
-aggregation = composite , ( ".sum" | ".sum0" | ".min" | ".max" | ".count" | ".avg" | ".start" | ".end" | ".stddev_pop" | ".stddev_samp" | ".var_pop" | ".var_samp" ) , [ "()" ] ;
-
 if = composite , ".?(" , expression , "," , expression , ")" ;
 
 functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
 
-atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;
+atom = ( "(" , expression , ")" ) | literal | fieldReference ;
 
 fieldReference = "*" | identifier ;
 
@@ -1082,10 +1082,16 @@ timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" |
 
 timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
 
+over = composite , "over" , fieldReference ;
+
+overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;
+
+timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;
+
 {% endhighlight %}
 
 Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The
-column names and function names follow Java identifier syntax. The column name `rowtime` is a reserved logical attribute in streaming environments. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and functions.
+column names and function names follow Java identifier syntax. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and functions.
 
 If working with exact numeric values or large decimals is required, the Table API also supports Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise e.g. `123456p`.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index ca61c65..2bcb3d8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -906,14 +906,15 @@ class GroupedTable(
     * }}}
     */
   def select(fields: Expression*): Table = {
-    val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
+    val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
+    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)
     if (propNames.nonEmpty) {
       throw ValidationException("Window properties can only be used on windowed tables.")
     }
 
     val projectsOnAgg = replaceAggregationsAndProperties(
-      fields, table.tableEnv, aggNames, propNames)
-    val projectFields = extractFieldReferences(fields ++ groupKey)
+      expandedFields, table.tableEnv, aggNames, propNames)
+    val projectFields = extractFieldReferences(expandedFields ++ groupKey)
 
     new Table(table.tableEnv,
       Project(projectsOnAgg,
@@ -1034,14 +1035,13 @@ class WindowGroupedTable(
     * }}}
     */
   def select(fields: Expression*): Table = {
-    // get group keys by removing window alias
-
-    val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
+    val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
+    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)
 
     val projectsOnAgg = replaceAggregationsAndProperties(
-      fields, table.tableEnv, aggNames, propNames)
+      expandedFields, table.tableEnv, aggNames, propNames)
 
-    val projectFields = extractFieldReferences(fields ++ groupKeys :+ window.timeField)
+    val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField)
 
     new Table(table.tableEnv,
       Project(

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala
index 4c12003..4fcfab0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Compiler.scala
@@ -32,9 +32,9 @@ trait Compiler[T] {
     try {
       compiler.cook(code)
     } catch {
-      case e: CompileException =>
+      case t: Throwable =>
         throw new InvalidProgramException("Table program cannot be compiled. " +
-          "This is a bug. Please file an issue.", e)
+          "This is a bug. Please file an issue.", t)
     }
     compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 98580ba..f995a96 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -18,16 +18,15 @@
 package org.apache.flink.table.expressions
 
 import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.api.{ExpressionParserException, CurrentRow, CurrentRange, UnboundedRow, UnboundedRange}
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.api._
 import org.apache.flink.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval}
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.table.expressions.TrimMode.TrimMode
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
-import scala.language.implicitConversions
-import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
 
 /**
  * Parser for expressions inside a String. This parses exactly the same expressions that
@@ -47,26 +46,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   }
 
   // Keyword
-
-  lazy val ARRAY: Keyword = Keyword("Array")
   lazy val AS: Keyword = Keyword("as")
-  lazy val COUNT: Keyword = Keyword("count")
-  lazy val AVG: Keyword = Keyword("avg")
-  lazy val MIN: Keyword = Keyword("min")
-  lazy val MAX: Keyword = Keyword("max")
-  lazy val SUM: Keyword = Keyword("sum")
-  lazy val START: Keyword = Keyword("start")
-  lazy val END: Keyword = Keyword("end")
-  lazy val SUM0: Keyword = Keyword("sum0")
-  lazy val STDDEV_POP: Keyword = Keyword("stddevPop")
-  lazy val STDDEV_SAMP: Keyword = Keyword("stddevSamp")
-  lazy val VAR_POP: Keyword = Keyword("varPop")
-  lazy val VAR_SAMP: Keyword = Keyword("varSamp")
   lazy val CAST: Keyword = Keyword("cast")
   lazy val NULL: Keyword = Keyword("Null")
   lazy val IF: Keyword = Keyword("?")
-  lazy val ASC: Keyword = Keyword("asc")
-  lazy val DESC: Keyword = Keyword("desc")
   lazy val TO_DATE: Keyword = Keyword("toDate")
   lazy val TO_TIME: Keyword = Keyword("toTime")
   lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
@@ -97,17 +80,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val CURRENT_RANGE: Keyword = Keyword("current_range")
   lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row")
   lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range")
-  lazy val ASIN: Keyword = Keyword("asin")
   lazy val ROWTIME: Keyword = Keyword("rowtime")
   lazy val PROCTIME: Keyword = Keyword("proctime")
 
-  def functionIdent: ExpressionParser.Parser[String] =
-    not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
-      not(SUM0) ~ not(STDDEV_POP) ~ not(STDDEV_SAMP) ~ not(VAR_POP) ~ not(VAR_SAMP) ~
-      not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~
-      not(ROWTIME) ~ not(PROCTIME) ~
-      not(CURRENT_ROW) ~ not(UNBOUNDED_ROW) ~ not(CURRENT_RANGE) ~ not(UNBOUNDED_RANGE) ~>
-      super.ident
+  def functionIdent: ExpressionParser.Parser[String] = super.ident
 
   // symbols
 
@@ -123,29 +99,46 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     case mode: TrimMode => literal(mode.toString) ^^^ mode.toExpr
   } reduceLeft(_ | _)
 
+  lazy val currentRange: PackratParser[Expression] = CURRENT_RANGE ^^ {
+    _ => CurrentRange()
+  }
+
+  lazy val currentRow: PackratParser[Expression] = CURRENT_ROW ^^ {
+    _ => CurrentRow()
+  }
+
+  lazy val unboundedRange: PackratParser[Expression] = UNBOUNDED_RANGE ^^ {
+    _ => UnboundedRange()
+  }
+
+  lazy val unboundedRow: PackratParser[Expression] = UNBOUNDED_ROW ^^ {
+    _ => UnboundedRow()
+  }
+
+  lazy val overConstant: PackratParser[Expression] =
+    currentRange | currentRow | unboundedRange | unboundedRow
+
   // data types
 
   lazy val dataType: PackratParser[TypeInformation[_]] =
-    "BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } |
-      "SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } |
-      "INTERVAL_MONTHS" ^^ {
-        ti => TimeIntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]]
-      } |
-      "INTERVAL_MILLIS" ^^ {
-        ti => TimeIntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]]
-      } |
-      "INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } |
-      "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
-      "FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } |
-      "DOUBLE" ^^ { ti => BasicTypeInfo.DOUBLE_TYPE_INFO } |
-      ("BOOLEAN" | "BOOL") ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
-      "STRING" ^^ { ti => BasicTypeInfo.STRING_TYPE_INFO } |
-      "DATE" ^^ { ti => SqlTimeTypeInfo.DATE.asInstanceOf[TypeInformation[_]] } |
-      "TIMESTAMP" ^^ { ti => SqlTimeTypeInfo.TIMESTAMP } |
-      "TIME" ^^ { ti => SqlTimeTypeInfo.TIME } |
-      "DECIMAL" ^^ { ti => BasicTypeInfo.BIG_DEC_TYPE_INFO }
-
-  // Literals
+    "PRIMITIVE_ARRAY" ~ "(" ~> dataType <~ ")" ^^ { ct => Types.PRIMITIVE_ARRAY(ct) } |
+    "OBJECT_ARRAY" ~ "(" ~> dataType <~ ")" ^^ { ct => Types.OBJECT_ARRAY(ct) } |
+    "BYTE" ^^ { e => Types.BYTE } |
+    "SHORT" ^^ { e => Types.SHORT } |
+    "INTERVAL_MONTHS" ^^ { e => Types.INTERVAL_MONTHS } |
+    "INTERVAL_MILLIS" ^^ { e => Types.INTERVAL_MILLIS } |
+    "INT" ^^ { e => Types.INT } |
+    "LONG" ^^ { e => Types.LONG } |
+    "FLOAT" ^^ { e => Types.FLOAT } |
+    "DOUBLE" ^^ { e => Types.DOUBLE } |
+    "BOOLEAN" ^^ { { e => Types.BOOLEAN } } |
+    "STRING" ^^ { e => Types.STRING } |
+    "SQL_DATE" ^^ { e => Types.SQL_DATE } |
+    "SQL_TIMESTAMP" ^^ { e => Types.SQL_TIMESTAMP } |
+    "SQL_TIME" ^^ { e => Types.SQL_TIME } |
+    "DECIMAL" ^^ { e => Types.DECIMAL }
+
+  // literals
 
   // same as floatingPointNumber but we do not allow trailing dot "12.d" or "2."
   lazy val floatingPointNumberFlink: Parser[String] =
@@ -182,25 +175,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     dt => Null(dt)
   }
 
-  // OVER constants
-  lazy val currentRange: PackratParser[Expression] = CURRENT_RANGE ^^ {
-    _ => CurrentRange()
-  }
-  lazy val currentRow: PackratParser[Expression] = CURRENT_ROW ^^ {
-    _ => CurrentRow()
-  }
-  lazy val unboundedRange: PackratParser[Expression] = UNBOUNDED_RANGE ^^ {
-    _ => UnboundedRange()
-  }
-  lazy val unboundedRow: PackratParser[Expression] = UNBOUNDED_ROW ^^ {
-    _ => UnboundedRow()
-  }
-  lazy val overConstant: PackratParser[Expression] =
-    currentRange | currentRow | unboundedRange | unboundedRow
-
   lazy val literalExpr: PackratParser[Expression] =
-    numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral |
-      overConstant
+    numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral
 
   lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
     sym => UnresolvedFieldReference(sym)
@@ -209,93 +185,56 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val atom: PackratParser[Expression] =
     ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
 
-  // suffix operators
-
-  lazy val suffixSum: PackratParser[Expression] =
-    composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
-
-  lazy val suffixSum0: PackratParser[Expression] =
-    composite <~ "." ~ SUM0 ~ opt("()") ^^ { e => Sum0(e) }
-
-  lazy val suffixStddevPop: PackratParser[Expression] =
-    composite <~ "." ~ STDDEV_POP ~ opt("()") ^^ { e => StddevPop(e) }
-
-  lazy val suffixStddevSamp: PackratParser[Expression] =
-    composite <~ "." ~ STDDEV_SAMP ~ opt("()") ^^ { e => StddevSamp(e) }
-
-  lazy val suffixVarSamp: PackratParser[Expression] =
-    composite <~ "." ~ VAR_SAMP ~ opt("()") ^^ { e => VarSamp(e) }
-
-  lazy val suffixVarPop: PackratParser[Expression] =
-    composite <~ "." ~ VAR_POP ~ opt("()") ^^ { e => VarPop(e) }
-
-  lazy val suffixMin: PackratParser[Expression] =
-    composite <~ "." ~ MIN ~ opt("()") ^^ { e => Min(e) }
-
-  lazy val suffixMax: PackratParser[Expression] =
-    composite <~ "." ~ MAX ~ opt("()") ^^ { e => Max(e) }
-
-  lazy val suffixCount: PackratParser[Expression] =
-    composite <~ "." ~ COUNT ~ opt("()") ^^ { e => Count(e) }
-
-  lazy val suffixAvg: PackratParser[Expression] =
-    composite <~ "." ~ AVG ~ opt("()") ^^ { e => Avg(e) }
-
-  lazy val suffixStart: PackratParser[Expression] =
-    composite <~ "." ~ START ~ opt("()") ^^ { e => WindowStart(e) }
+  lazy val over: PackratParser[Expression] = composite ~ OVER ~ fieldReference ^^ {
+    case agg ~ _ ~ windowRef => UnresolvedOverCall(agg, windowRef)
+  }
 
-  lazy val suffixEnd: PackratParser[Expression] =
-    composite <~ "." ~ END ~ opt("()") ^^ { e => WindowEnd(e) }
+  // suffix operators
 
   lazy val suffixCast: PackratParser[Expression] =
     composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
     case e ~ _ ~ _ ~ _ ~ dt ~ _ => Cast(e, dt)
   }
 
-  lazy val suffixAs: PackratParser[Expression] =
-    composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
-    case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
-  }
-
-  lazy val suffixTrim = composite ~ "." ~ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ ")" ^^ {
-    case operand ~ _ ~ _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ => Trim(mode, trimCharacter, operand)
-  }
+  lazy val suffixTrim: PackratParser[Expression] =
+    composite ~ "." ~ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ ")" ^^ {
+      case operand ~ _ ~ _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ => Trim(mode, trimCharacter, operand)
+    }
 
-  lazy val suffixTrimWithoutArgs = composite <~ "." ~ TRIM ~ opt("()") ^^ {
-    e => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
-  }
+  lazy val suffixTrimWithoutArgs: PackratParser[Expression] =
+    composite <~ "." ~ TRIM ~ opt("()") ^^ {
+      e => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
+    }
 
   lazy val suffixIf: PackratParser[Expression] =
     composite ~ "." ~ IF ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
     case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
   }
 
-  lazy val suffixExtract = composite ~ "." ~ EXTRACT ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
-    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
-  }
+  lazy val suffixExtract: PackratParser[Expression] =
+    composite ~ "." ~ EXTRACT ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+      case operand ~ _  ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
+    }
 
-  lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
-    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
-  }
+  lazy val suffixFloor: PackratParser[Expression] =
+    composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+      case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+    }
 
-  lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
-    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
-  }
+  lazy val suffixCeil: PackratParser[Expression] =
+    composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+      case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+    }
 
-  lazy val suffixFunctionCall =
+  lazy val suffixFunctionCall: PackratParser[Expression] =
     composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
     case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
   }
 
-  lazy val suffixFunctionCallOneArg = composite ~ "." ~ functionIdent ^^ {
-    case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand))
-  }
-
-  lazy val suffixAsc : PackratParser[Expression] =
-    atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) }
-
-  lazy val suffixDesc : PackratParser[Expression] =
-    atom <~ "." ~ DESC ~ opt("()") ^^ { e => Desc(e) }
+  lazy val suffixFunctionCallOneArg: PackratParser[Expression] =
+    composite ~ "." ~ functionIdent ^^ {
+      case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand))
+    }
 
   lazy val suffixToDate: PackratParser[Expression] =
     composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.DATE) }
@@ -337,103 +276,71 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val suffixFlattening: PackratParser[Expression] =
     composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
 
-  lazy val suffixAgg: PackratParser[Expression] =
-    suffixSum0 | suffixStddevPop | suffixStddevSamp | suffixVarPop | suffixVarSamp |
-      suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg
-
-  lazy val suffixAsin: PackratParser[Expression] =
-    composite <~ "." ~ ASIN ~ opt("()") ^^ { e => Asin(e) }
+  lazy val suffixAs: PackratParser[Expression] =
+    composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+      case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
+  }
 
   lazy val suffixed: PackratParser[Expression] =
-    suffixTimeInterval | suffixRowInterval | suffixStart | suffixEnd | suffixAgg |
-      suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixIf | suffixAsc |
-      suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime | suffixExtract |
-      suffixFloor | suffixCeil | suffixGet | suffixFlattening | suffixAsin |
-      suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
+    // expressions that need to be resolved early
+    suffixFlattening |
+    // expressions that need special expression conversion
+    suffixAs | suffixTimeInterval | suffixRowInterval | suffixToTimestamp | suffixToTime |
+    suffixToDate |
+    // expressions that take enumerations
+    suffixCast | suffixTrim | suffixTrimWithoutArgs | suffixExtract | suffixFloor | suffixCeil |
+    // expressions that take literals
+    suffixGet |
+    // expression with special identifier
+    suffixIf |
+    // function call must always be at the end
+    suffixFunctionCall | suffixFunctionCallOneArg
 
   // prefix operators
 
-  lazy val prefixArray: PackratParser[Expression] =
-    ARRAY ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { elements => ArrayConstructor(elements) }
-
-  lazy val prefixSum: PackratParser[Expression] =
-    SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }
-
-  lazy val prefixSum0: PackratParser[Expression] =
-    SUM0 ~ "(" ~> expression <~ ")" ^^ { e => Sum0(e) }
-
-  lazy val prefixStddevPop: PackratParser[Expression] =
-    STDDEV_POP ~ "(" ~> expression <~ ")" ^^ { e => StddevPop(e) }
-
-  lazy val prefixStddevSamp: PackratParser[Expression] =
-    STDDEV_SAMP ~ "(" ~> expression <~ ")" ^^ { e => StddevSamp(e) }
-
-  lazy val prefixVarPop: PackratParser[Expression] =
-    VAR_POP ~ "(" ~> expression <~ ")" ^^ { e => VarPop(e) }
-
-  lazy val prefixVarSamp: PackratParser[Expression] =
-    VAR_SAMP ~ "(" ~> expression <~ ")" ^^ { e => VarSamp(e) }
-
-  lazy val prefixMin: PackratParser[Expression] =
-    MIN ~ "(" ~> expression <~ ")" ^^ { e => Min(e) }
-
-  lazy val prefixMax: PackratParser[Expression] =
-    MAX ~ "(" ~> expression <~ ")" ^^ { e => Max(e) }
-
-  lazy val prefixCount: PackratParser[Expression] =
-    COUNT ~ "(" ~> expression <~ ")" ^^ { e => Count(e) }
-
-  lazy val prefixAvg: PackratParser[Expression] =
-    AVG ~ "(" ~> expression <~ ")" ^^ { e => Avg(e) }
-
-  lazy val prefixStart: PackratParser[Expression] =
-    START ~ "(" ~> expression <~ ")" ^^ { e => WindowStart(e) }
-
-  lazy val prefixEnd: PackratParser[Expression] =
-    END ~ "(" ~> expression <~ ")" ^^ { e => WindowEnd(e) }
-
   lazy val prefixCast: PackratParser[Expression] =
     CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
     case _ ~ _ ~ e ~ _ ~ dt ~ _ => Cast(e, dt)
   }
 
-  lazy val prefixAs: PackratParser[Expression] =
-    AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
-    case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
-  }
-
   lazy val prefixIf: PackratParser[Expression] =
       IF ~ "(" ~ expression ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
     case _ ~ _ ~ condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
   }
 
-  lazy val prefixFunctionCall = functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
-    case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
-  }
+  lazy val prefixFunctionCall: PackratParser[Expression] =
+    functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+      case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
+    }
 
-  lazy val prefixFunctionCallOneArg = functionIdent ~ "(" ~ expression ~ ")" ^^ {
-    case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg))
-  }
+  lazy val prefixFunctionCallOneArg: PackratParser[Expression] =
+    functionIdent ~ "(" ~ expression ~ ")" ^^ {
+      case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg))
+    }
 
-  lazy val prefixTrim = TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
-    case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand)
-  }
+  lazy val prefixTrim: PackratParser[Expression] =
+    TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand)
+    }
 
-  lazy val prefixTrimWithoutArgs = TRIM ~ "(" ~ expression ~ ")" ^^ {
+  lazy val prefixTrimWithoutArgs: PackratParser[Expression] = TRIM ~ "(" ~ expression ~ ")" ^^ {
     case _ ~ _ ~ operand ~ _ => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, operand)
   }
 
-  lazy val prefixExtract = EXTRACT ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
-    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
-  }
+  lazy val prefixExtract: PackratParser[Expression] =
+    EXTRACT ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+      case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
+    }
 
-  lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
-    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
-  }
+  lazy val prefixFloor: PackratParser[Expression] =
+    FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+      case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+    }
 
-  lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
-    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
-  }
+  lazy val prefixCeil: PackratParser[Expression] =
+    CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+      case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+    }
 
   lazy val prefixGet: PackratParser[Expression] =
     GET ~ "(" ~ composite ~ ","  ~ literalExpr ~ ")" ^^ {
@@ -444,30 +351,37 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val prefixFlattening: PackratParser[Expression] =
     FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
 
-  lazy val prefixAgg: PackratParser[Expression] =
-    prefixSum0 | prefixStddevPop | prefixStddevSamp | prefixVarPop | prefixVarSamp |
-      prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg
+  lazy val prefixToDate: PackratParser[Expression] =
+    TO_DATE ~ "(" ~> expression <~ ")" ^^ { e => Cast(e, SqlTimeTypeInfo.DATE) }
 
-  lazy val prefixAsin: PackratParser[Expression] =
-    ASIN ~ "(" ~> composite <~ ")" ^^ { e => Asin(e) }
+  lazy val prefixToTimestamp: PackratParser[Expression] =
+    TO_TIMESTAMP ~ "(" ~> expression <~ ")" ^^ { e => Cast(e, SqlTimeTypeInfo.TIMESTAMP) }
 
-  lazy val prefixed: PackratParser[Expression] =
-    prefixArray | prefixAgg | prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim |
-      prefixTrimWithoutArgs | prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet |
-      prefixFlattening | prefixAsin | prefixFunctionCall |
-      prefixFunctionCallOneArg // function call must always be at the end
-
-  // over
+  lazy val prefixToTime: PackratParser[Expression] =
+    TO_TIME ~ "(" ~> expression <~ ")" ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) }
 
-  lazy val over: PackratParser[Expression] = suffixAgg ~ OVER ~ fieldReference ^^ {
-    case agg ~ _ ~ windowRef => UnresolvedOverCall(agg, windowRef)
-  } | prefixAgg ~ OVER ~ fieldReference ^^ {
-    case agg ~ _ ~ windowRef => UnresolvedOverCall(agg, windowRef)
+  lazy val prefixAs: PackratParser[Expression] =
+    AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
   }
 
+  lazy val prefixed: PackratParser[Expression] =
+    // expressions that need to be resolved early
+    prefixFlattening |
+    // expressions that need special expression conversion
+    prefixAs| prefixToTimestamp | prefixToTime | prefixToDate |
+    // expressions that take enumerations
+    prefixCast | prefixTrim | prefixTrimWithoutArgs | prefixExtract | prefixFloor | prefixCeil |
+    // expressions that take literals
+    prefixGet |
+    // expression with special identifier
+    prefixIf |
+    // function call must always be at the end
+    prefixFunctionCall | prefixFunctionCallOneArg
+
   // suffix/prefix composite
 
-  lazy val composite: PackratParser[Expression] = suffixed | over | prefixed | atom |
+  lazy val composite: PackratParser[Expression] = over | nullLiteral | suffixed | prefixed | atom |
     failure("Composite expression expected.")
 
   // unary ops
@@ -478,23 +392,23 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   lazy val unaryPlus: PackratParser[Expression] = "+" ~> composite ^^ { e => e }
 
-  lazy val unary = composite | unaryNot | unaryMinus | unaryPlus |
+  lazy val unary: PackratParser[Expression] = composite | unaryNot | unaryMinus | unaryPlus |
     failure("Unary expression expected.")
 
   // arithmetic
 
-  lazy val product = unary * (
+  lazy val product: PackratParser[Expression] = unary * (
     "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
     "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
     "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) |
     failure("Product expected.")
 
-  lazy val term = product * (
+  lazy val term: PackratParser[Expression] = product * (
     "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
     "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) |
     failure("Term expected.")
 
-  // Comparison
+  // comparison
 
   lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "==" | "=") ~ term ^^ {
     case l ~ _ ~ r => EqualTo(l, r)
@@ -528,33 +442,32 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   // logic
 
-  lazy val logic = comparison * (
+  lazy val logic: PackratParser[Expression] = comparison * (
     "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
     "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) |
     failure("Logic expected.")
 
-  // alias
-
-  lazy val alias: PackratParser[Expression] = timeIndicator |
-    logic ~ AS ~ fieldReference ^^ {
-      case e ~ _ ~ name => Alias(e, name.name)
-  } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
-    case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
-  } | logic
-
   // time indicators
 
-  lazy val timeIndicator: PackratParser[Expression] = procTime | rowTime
+  lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime
 
-  lazy val procTime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
+  lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
     case f ~ _ ~ _ => ProctimeAttribute(f)
   }
 
-  lazy val rowTime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
+  lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
     case f ~ _ ~ _ => RowtimeAttribute(f)
   }
 
-  lazy val expression: PackratParser[Expression] = alias |
+  // alias
+
+  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+      case e ~ _ ~ name => Alias(e, name.name)
+  } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
+  } | logic
+
+  lazy val expression: PackratParser[Expression] = timeIndicator | overConstant | alias |
     failure("Invalid expression.")
 
   lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index 98a7e63..802768e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -338,7 +338,7 @@ object ProjectionTranslator {
       // Functions calls
       case c @ Call(name, args) =>
         val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
-        if (function.isInstanceOf[AggFunctionCall]) {
+        if (function.isInstanceOf[AggFunctionCall] || function.isInstanceOf[Aggregation]) {
           function
         } else {
           val newArgs =

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 31ad558..4d945a7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -21,14 +21,14 @@ package org.apache.flink.table.validate
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable}
 import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api._
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.utils.{AggSqlFunction, ScalarSqlFunction, TableSqlFunction}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
+import _root_.scala.collection.JavaConversions._
+import _root_.scala.collection.mutable
+import _root_.scala.util.{Failure, Success, Try}
 
 /**
   * A catalog for looking up (user-defined) functions, used during validation phases
@@ -150,10 +150,6 @@ object FunctionCatalog {
 
   val builtInFunctions: Map[String, Class[_]] = Map(
 
-//    SqlStdOperatorTable.AS,
-//    SqlStdOperatorTable.DIVIDE_INTEGER,
-//    SqlStdOperatorTable.DOT,
-
     // logic
     "and" -> classOf[And],
     "or" -> classOf[Or],
@@ -194,7 +190,6 @@ object FunctionCatalog {
     "similar" -> classOf[Similar],
     "substring" -> classOf[Substring],
     "trim" -> classOf[Trim],
-    // duplicate functions for calcite
     "upper" -> classOf[Upper],
     "upperCase" -> classOf[Upper],
     "position" -> classOf[Position],
@@ -240,13 +235,18 @@ object FunctionCatalog {
     "dateTimePlus" -> classOf[Plus],
 
     // array
+    "array" -> classOf[ArrayConstructor],
     "cardinality" -> classOf[ArrayCardinality],
     "at" -> classOf[ArrayElementAt],
-    "element" -> classOf[ArrayElement]
+    "element" -> classOf[ArrayElement],
+
+    // window properties
+    "start" -> classOf[WindowStart],
+    "end" -> classOf[WindowEnd],
 
-    // TODO implement function overloading here
-    // "floor" -> classOf[TemporalFloor]
-    // "ceil" -> classOf[TemporalCeil]
+    // ordering
+    "asc" -> classOf[Asc],
+    "desc" -> classOf[Desc]
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
index 19d27fe..8a2db41 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
@@ -86,13 +86,13 @@ class CastingStringExpressionTest {
       // * -> String
       "_1.cast(STRING), _2.cast(STRING), _3.cast(STRING), _4.cast(STRING)," +
         // NUMERIC TYPE -> Boolean
-        "_1.cast(BOOL), _2.cast(BOOL), _3.cast(BOOL)," +
+        "_1.cast(BOOLEAN), _2.cast(BOOLEAN), _3.cast(BOOLEAN)," +
         // NUMERIC TYPE -> NUMERIC TYPE
         "_1.cast(DOUBLE), _2.cast(INT), _3.cast(SHORT)," +
         // Boolean -> NUMERIC TYPE
         "_4.cast(DOUBLE)," +
         // identity casting
-        "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOL)")
+        "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOLEAN)")
 
     val lPlan1 = t1.logicalPlan
     val lPlan2 = t2.logicalPlan
@@ -110,7 +110,7 @@ class CastingStringExpressionTest {
         '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
     val t2 = table.select(
       "_1.cast(BYTE), _1.cast(SHORT), _1.cast(INT), _1.cast(LONG), " +
-        "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOL)")
+        "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOLEAN)")
 
     val lPlan1 = t1.logicalPlan
     val lPlan2 = t2.logicalPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
index bdc239d..42f8008 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
@@ -154,12 +154,12 @@ class DecimalTypeTest extends ExpressionTestBase {
     // to boolean (not SQL compliant)
     testTableApi(
       'f1.cast(Types.BOOLEAN),
-      "f1.cast(BOOL)",
+      "f1.cast(BOOLEAN)",
       "true")
 
     testTableApi(
       'f5.cast(Types.BOOLEAN),
-      "f5.cast(BOOL)",
+      "f5.cast(BOOLEAN)",
       "false")
 
     testTableApi(

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index c1c2508..76c02c0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1409,7 +1409,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
 
     testAllApis(
       temporalOverlaps("9:00:00".toTime, "9:30:00".toTime, "9:29:00".toTime, "9:31:00".toTime),
-      "temporalOverlaps('9:00:00'.toTime, '9:30:00'.toTime, '9:29:00'.toTime, '9:31:00'.toTime)",
+      "temporalOverlaps(toTime('9:00:00'), '9:30:00'.toTime, '9:29:00'.toTime, '9:31:00'.toTime)",
       "(TIME '9:00:00', TIME '9:30:00') OVERLAPS (TIME '9:29:00', TIME '9:31:00')",
       "true")
 
@@ -1421,14 +1421,14 @@ class ScalarFunctionsTest extends ExpressionTestBase {
 
     testAllApis(
       temporalOverlaps("2011-03-10".toDate, 10.days, "2011-03-19".toDate, 10.days),
-      "temporalOverlaps('2011-03-10'.toDate, 10.days, '2011-03-19'.toDate, 10.days)",
+      "temporalOverlaps(toDate('2011-03-10'), 10.days, '2011-03-19'.toDate, 10.days)",
       "(DATE '2011-03-10', INTERVAL '10' DAY) OVERLAPS (DATE '2011-03-19', INTERVAL '10' DAY)",
       "true")
 
     testAllApis(
       temporalOverlaps("2011-03-10 05:02:02".toTimestamp, 0.milli,
         "2011-03-10 05:02:02".toTimestamp, "2011-03-10 05:02:01".toTimestamp),
-      "temporalOverlaps('2011-03-10 05:02:02'.toTimestamp, 0.milli, " +
+      "temporalOverlaps(toTimestamp('2011-03-10 05:02:02'), 0.milli, " +
         "'2011-03-10 05:02:02'.toTimestamp, '2011-03-10 05:02:01'.toTimestamp)",
       "(TIMESTAMP '2011-03-10 05:02:02', INTERVAL '0' SECOND) OVERLAPS " +
         "(TIMESTAMP '2011-03-10 05:02:02', TIMESTAMP '2011-03-10 05:02:01')",

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
index 58577be..4f47fd5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
@@ -45,7 +45,7 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       "1500-04-30".cast(Types.SQL_DATE),
-      "'1500-04-30'.cast(DATE)",
+      "'1500-04-30'.cast(SQL_DATE)",
       "CAST('1500-04-30' AS DATE)",
       "1500-04-30")
 
@@ -62,7 +62,7 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       "1:30:00".cast(Types.SQL_TIME),
-      "'1:30:00'.cast(TIME)",
+      "'1:30:00'.cast(SQL_TIME)",
       "CAST('1:30:00' AS TIME)",
       "01:30:00")
 
@@ -79,7 +79,7 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       "1500-04-30 12:00:00".cast(Types.SQL_TIMESTAMP),
-      "'1500-04-30 12:00:00'.cast(TIMESTAMP)",
+      "'1500-04-30 12:00:00'.cast(SQL_TIMESTAMP)",
       "CAST('1500-04-30 12:00:00' AS TIMESTAMP)",
       "1500-04-30 12:00:00.0")
   }
@@ -169,62 +169,62 @@ class TemporalTypesTest extends ExpressionTestBase {
   def testTimePointCasting(): Unit = {
     testAllApis(
       'f0.cast(Types.SQL_TIMESTAMP),
-      "f0.cast(TIMESTAMP)",
+      "f0.cast(SQL_TIMESTAMP)",
       "CAST(f0 AS TIMESTAMP)",
       "1990-10-14 00:00:00.0")
 
     testAllApis(
       'f1.cast(Types.SQL_TIMESTAMP),
-      "f1.cast(TIMESTAMP)",
+      "f1.cast(SQL_TIMESTAMP)",
       "CAST(f1 AS TIMESTAMP)",
       "1970-01-01 10:20:45.0")
 
     testAllApis(
       'f2.cast(Types.SQL_DATE),
-      "f2.cast(DATE)",
+      "f2.cast(SQL_DATE)",
       "CAST(f2 AS DATE)",
       "1990-10-14")
 
     testAllApis(
       'f2.cast(Types.SQL_TIME),
-      "f2.cast(TIME)",
+      "f2.cast(SQL_TIME)",
       "CAST(f2 AS TIME)",
       "10:20:45")
 
     testAllApis(
       'f2.cast(Types.SQL_TIME),
-      "f2.cast(TIME)",
+      "f2.cast(SQL_TIME)",
       "CAST(f2 AS TIME)",
       "10:20:45")
 
     testTableApi(
       'f7.cast(Types.SQL_DATE),
-      "f7.cast(DATE)",
+      "f7.cast(SQL_DATE)",
       "2002-11-09")
 
     testTableApi(
       'f7.cast(Types.SQL_DATE).cast(Types.INT),
-      "f7.cast(DATE).cast(INT)",
+      "f7.cast(SQL_DATE).cast(INT)",
       "12000")
 
     testTableApi(
       'f7.cast(Types.SQL_TIME),
-      "f7.cast(TIME)",
+      "f7.cast(SQL_TIME)",
       "00:00:12")
 
     testTableApi(
       'f7.cast(Types.SQL_TIME).cast(Types.INT),
-      "f7.cast(TIME).cast(INT)",
+      "f7.cast(SQL_TIME).cast(INT)",
       "12000")
 
     testTableApi(
       'f8.cast(Types.SQL_TIMESTAMP),
-      "f8.cast(TIMESTAMP)",
+      "f8.cast(SQL_TIMESTAMP)",
       "2016-06-27 07:23:33.0")
 
     testTableApi(
       'f8.cast(Types.SQL_TIMESTAMP).cast(Types.LONG),
-      "f8.cast(TIMESTAMP).cast(LONG)",
+      "f8.cast(SQL_TIMESTAMP).cast(LONG)",
       "1467012213000")
   }
 
@@ -263,13 +263,13 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       'f0.cast(Types.SQL_TIMESTAMP) !== 'f2,
-      "f0.cast(TIMESTAMP) !== f2",
+      "f0.cast(SQL_TIMESTAMP) !== f2",
       "CAST(f0 AS TIMESTAMP) <> f2",
       "true")
 
     testAllApis(
       'f0.cast(Types.SQL_TIMESTAMP) === 'f6,
-      "f0.cast(TIMESTAMP) === f6",
+      "f0.cast(SQL_TIMESTAMP) === f6",
       "CAST(f0 AS TIMESTAMP) = f6",
       "true")
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index 1e7d580..f0432cf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -64,6 +64,31 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       "Func6(f4, f5, f6)",
       "Func6(f4, f5, f6)",
       "(1990-10-14,12:10:10,1990-10-14 12:10:10.0)")
+
+    // function names containing keywords
+    testAllApis(
+      Func0('f0),
+      "getFunc0(f0)",
+      "getFunc0(f0)",
+      "42")
+
+    testAllApis(
+      Func0('f0),
+      "asAlways(f0)",
+      "asAlways(f0)",
+      "42")
+
+    testAllApis(
+      Func0('f0),
+      "toWhatever(f0)",
+      "toWhatever(f0)",
+      "42")
+
+    testAllApis(
+      Func0('f0),
+      "Nullable(f0)",
+      "Nullable(f0)",
+      "42")
   }
 
   @Test
@@ -286,7 +311,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
 
     testAllApis(
       JavaFunc1(Null(Types.SQL_TIME), 15, Null(Types.SQL_TIMESTAMP)),
-      "JavaFunc1(Null(TIME), 15, Null(TIMESTAMP))",
+      "JavaFunc1(Null(SQL_TIME), 15, Null(SQL_TIMESTAMP))",
       "JavaFunc1(NULL, 15, NULL)",
       "null and 15 and null")
 
@@ -358,6 +383,10 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
 
   override def functions: Map[String, ScalarFunction] = Map(
     "Func0" -> Func0,
+    "getFunc0" -> Func0,
+    "asAlways" -> Func0,
+    "toWhatever" -> Func0,
+    "Nullable" -> Func0,
     "Func1" -> Func1,
     "Func2" -> Func2,
     "Func3" -> Func3,

http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
index 999d20f..5d5eece 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
 import org.apache.calcite.sql.SqlPostfixOperator
 import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.expressions._
 import org.apache.flink.table.utils.InputTypeBuilder.inputOf
 import org.apache.flink.table.validate.FunctionCatalog
 import org.hamcrest.CoreMatchers.is
@@ -253,8 +253,8 @@ class RexProgramExtractorTest extends RexProgramTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      ExpressionParser.parseExpression("sum(amount) > 100"),
-      ExpressionParser.parseExpression("min(id) == 100")
+      GreaterThan(Sum(UnresolvedFieldReference("amount")), Literal(100)),
+      EqualTo(Min(UnresolvedFieldReference("id")), Literal(100))
     )
     assertExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)


Mime
View raw message