flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/5] flink git commit: [FLINK-7410] [table] Use UserDefinedFunction.toString() to display operator names of UDFs.
Date Tue, 10 Oct 2017 21:09:57 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9829ca00d -> 427dfe42e


[FLINK-7410] [table] Use UserDefinedFunction.toString() to display operator names of UDFs.

This closes #4624.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/427dfe42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/427dfe42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/427dfe42

Branch: refs/heads/master
Commit: 427dfe42e2bea891b40e662bc97cdea57cdae3f5
Parents: dccdba1
Author: 军长 <hequn.chq@alibaba-inc.com>
Authored: Wed Aug 30 19:30:52 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Oct 10 23:09:07 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/TableEnvironment.scala      |  7 +++--
 .../flink/table/expressions/aggregations.scala  |  3 +-
 .../apache/flink/table/expressions/call.scala   |  1 +
 .../flink/table/functions/ScalarFunction.scala  |  2 --
 .../flink/table/functions/TableFunction.scala   |  2 --
 .../table/functions/UserDefinedFunction.scala   |  8 ++++-
 .../table/functions/utils/AggSqlFunction.scala  | 14 ++++++++-
 .../functions/utils/ScalarSqlFunction.scala     |  4 +++
 .../functions/utils/TableSqlFunction.scala      |  3 ++
 .../utils/UserDefinedFunctionUtils.scala        |  8 +++--
 .../flink/table/plan/logical/operators.scala    |  1 +
 .../table/plan/nodes/CommonCorrelate.scala      | 20 ++++++++----
 .../plan/nodes/dataset/DataSetCorrelate.scala   | 20 +++++++++---
 .../nodes/datastream/DataStreamCorrelate.scala  | 16 ++++++++--
 .../plan/rules/logical/LogicalUnnestRule.scala  |  1 +
 .../utils/JavaUserDefinedAggFunctions.java      |  5 +++
 .../flink/table/api/TableSourceTest.scala       |  2 +-
 .../table/api/batch/sql/CorrelateTest.scala     | 30 ++++++++++++------
 .../flink/table/api/batch/table/CalcTest.scala  | 12 +++----
 .../table/api/batch/table/CorrelateTest.scala   | 10 +++---
 .../table/api/batch/table/GroupWindowTest.scala |  6 ++--
 .../table/api/stream/sql/CorrelateTest.scala    | 30 ++++++++++++------
 .../table/api/stream/table/CorrelateTest.scala  | 33 +++++++++++++-------
 .../api/stream/table/GroupWindowTest.scala      |  6 ++--
 .../table/api/stream/table/OverWindowTest.scala |  3 +-
 .../plan/ExpressionReductionRulesTest.scala     |  2 +-
 .../plan/TimeIndicatorConversionTest.scala      |  3 +-
 .../table/runtime/stream/table/CalcITCase.scala |  2 +-
 28 files changed, 176 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index dc82a87..54877ba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -331,7 +331,9 @@ abstract class TableEnvironment(val config: TableConfig) {
     functionCatalog.registerFunction(name, function.getClass)
 
     // register in SQL API
-    functionCatalog.registerSqlFunction(createScalarSqlFunction(name, function, typeFactory))
+    functionCatalog.registerSqlFunction(
+      createScalarSqlFunction(name, name, function, typeFactory)
+    )
   }
 
   /**
@@ -355,7 +357,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     functionCatalog.registerFunction(name, function.getClass)
 
     // register in SQL API
-    val sqlFunction = createTableSqlFunction(name, function, typeInfo, typeFactory)
+    val sqlFunction = createTableSqlFunction(name, name, function, typeInfo, typeFactory)
     functionCatalog.registerSqlFunction(sqlFunction)
   }
 
@@ -384,6 +386,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     // register in SQL API
     val sqlFunctions = createAggregateSqlFunction(
       name,
+      name,
       function,
       resultTypeInfo,
       accTypeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index c2d1bdf..1ffcb12 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -261,7 +261,8 @@ case class AggFunctionCall(
   override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     AggSqlFunction(
-      aggregateFunction.getClass.getSimpleName,
+      aggregateFunction.functionIdentifier,
+      aggregateFunction.toString,
       aggregateFunction,
       resultType,
       accTypeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index cad9ccc..8454555 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -272,6 +272,7 @@ case class ScalarFunctionCall(
     relBuilder.call(
       createScalarSqlFunction(
         scalarFunction.functionIdentifier,
+        scalarFunction.toString,
         scalarFunction,
         typeFactory),
       parameters.map(_.toRexNode): _*)

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
index 40c60ac..e41b876 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
@@ -56,8 +56,6 @@ abstract class ScalarFunction extends UserDefinedFunction {
     ScalarFunctionCall(this, params)
   }
 
-  override def toString: String = getClass.getCanonicalName
-
   // ----------------------------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
index b6e801a..ff69954 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
@@ -81,8 +81,6 @@ import org.apache.flink.util.Collector
   */
 abstract class TableFunction[T] extends UserDefinedFunction {
 
-  override def toString: String = getClass.getCanonicalName
-
   // ----------------------------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index b841b31..15bcb17 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -41,7 +41,7 @@ abstract class UserDefinedFunction extends Serializable {
   def close(): Unit = {}
 
   /**
-    * @return true iff a call to this function is guaranteed to always return
+    * @return true if and only if a call to this function is guaranteed to always return
     *         the same result given the same parameters; true is assumed by default
     *         if user's function is not pure functional, like random(), date(), now()...
     *         isDeterministic must return false
@@ -52,4 +52,10 @@ abstract class UserDefinedFunction extends Serializable {
     val md5 = DigestUtils.md5Hex(serialize(this))
     getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
   }
+
+  /**
+    * Returns the name of the UDF that is used for plan explain and logging.
+    */
+  override def toString: String = getClass.getSimpleName
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index bb71d63..f44598b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -35,6 +35,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
   * Calcite wrapper for user-defined aggregate functions.
   *
   * @param name function name (used by SQL parser)
+  * @param displayName name to be displayed in operator name
   * @param aggregateFunction aggregate function to be called
   * @param returnType the type information of returned value
   * @param accType the type information of the accumulator
@@ -42,6 +43,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
   */
 class AggSqlFunction(
     name: String,
+    displayName: String,
     aggregateFunction: AggregateFunction[_, _],
     val returnType: TypeInformation[_],
     val accType: TypeInformation[_],
@@ -62,19 +64,29 @@ class AggSqlFunction(
   def getFunction: AggregateFunction[_, _] = aggregateFunction
 
   override def isDeterministic: Boolean = aggregateFunction.isDeterministic
+
+  override def toString: String = displayName
 }
 
 object AggSqlFunction {
 
   def apply(
       name: String,
+      displayName: String,
       aggregateFunction: AggregateFunction[_, _],
       returnType: TypeInformation[_],
       accType: TypeInformation[_],
       typeFactory: FlinkTypeFactory,
       requiresOver: Boolean): AggSqlFunction = {
 
-    new AggSqlFunction(name, aggregateFunction, returnType, accType, typeFactory, requiresOver)
+    new AggSqlFunction(
+      name,
+      displayName,
+      aggregateFunction,
+      returnType,
+      accType,
+      typeFactory,
+      requiresOver)
   }
 
   private[flink] def createOperandTypeInference(

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index 784bca7..27e093d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -35,11 +35,13 @@ import scala.collection.JavaConverters._
   * Calcite wrapper for user-defined scalar functions.
   *
   * @param name function name (used by SQL parser)
+  * @param displayName name to be displayed in operator name
   * @param scalarFunction scalar function to be called
   * @param typeFactory type factory for converting Flink's between Calcite's types
   */
 class ScalarSqlFunction(
     name: String,
+    displayName: String,
     scalarFunction: ScalarFunction,
     typeFactory: FlinkTypeFactory)
   extends SqlFunction(
@@ -53,6 +55,8 @@ class ScalarSqlFunction(
   def getScalarFunction = scalarFunction
 
   override def isDeterministic: Boolean = scalarFunction.isDeterministic
+
+  override def toString: String = displayName
 }
 
 object ScalarSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
index 6d9742c..741d15b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
@@ -37,6 +37,7 @@ import org.apache.flink.table.functions.utils.TableSqlFunction._
   */
 class TableSqlFunction(
     name: String,
+    displayName: String,
     tableFunction: TableFunction[_],
     rowTypeInfo: TypeInformation[_],
     typeFactory: FlinkTypeFactory,
@@ -66,6 +67,8 @@ class TableSqlFunction(
   def getPojoFieldMapping: Array[Int] = functionImpl.fieldIndexes
 
   override def isDeterministic: Boolean = tableFunction.isDeterministic
+
+  override def toString: String = displayName
 }
 
 object TableSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 6a90569..3cd694a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -251,10 +251,11 @@ object UserDefinedFunctionUtils {
     */
   def createScalarSqlFunction(
       name: String,
+      displayName: String,
       function: ScalarFunction,
       typeFactory: FlinkTypeFactory)
     : SqlFunction = {
-    new ScalarSqlFunction(name, function, typeFactory)
+    new ScalarSqlFunction(name, displayName, function, typeFactory)
   }
 
   /**
@@ -268,13 +269,14 @@ object UserDefinedFunctionUtils {
     */
   def createTableSqlFunction(
       name: String,
+      displayName: String,
       tableFunction: TableFunction[_],
       resultType: TypeInformation[_],
       typeFactory: FlinkTypeFactory)
     : SqlFunction = {
     val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
     val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames)
-    new TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
+    new TableSqlFunction(name, displayName, tableFunction, resultType, typeFactory, function)
   }
 
   /**
@@ -287,6 +289,7 @@ object UserDefinedFunctionUtils {
     */
   def createAggregateSqlFunction(
       name: String,
+      displayName: String,
       aggFunction: AggregateFunction[_, _],
       resultType: TypeInformation[_],
       accTypeInfo: TypeInformation[_],
@@ -297,6 +300,7 @@ object UserDefinedFunctionUtils {
 
     AggSqlFunction(
       name,
+      displayName,
       aggFunction,
       resultType,
       accTypeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 559d20d..0c8efd7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -728,6 +728,7 @@ case class LogicalTableFunctionCall(
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     val sqlFunction = new TableSqlFunction(
       tableFunction.functionIdentifier,
+      tableFunction.toString,
       tableFunction,
       resultType,
       typeFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 7c01fde..c53f090 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -179,21 +179,29 @@ trait CommonCorrelate {
   }
 
   private[flink] def selectToString(rowType: RelDataType): String = {
-    rowType.getFieldNames.asScala.mkString(",")
+    rowType.getFieldNames.asScala.mkString(", ")
   }
 
   private[flink] def correlateOpName(
+      inputType: RelDataType,
       rexCall: RexCall,
       sqlFunction: TableSqlFunction,
-      rowType: RelDataType)
+      rowType: RelDataType,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String)
     : String = {
 
-    s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
+    s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," +
+      s" select: ${selectToString(rowType)}"
   }
 
-  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
-    val udtfName = sqlFunction.getName
-    val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
+  private[flink] def correlateToString(
+      inputType: RelDataType,
+      rexCall: RexCall,
+      sqlFunction: TableSqlFunction,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+    val inFields = inputType.getFieldNames.asScala.toList
+    val udtfName = sqlFunction.toString
+    val operands = rexCall.getOperands.asScala.map(expression(_, inFields, None)).mkString(", ")
     s"table($udtfName($operands))"
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 731d2e5..5f94562 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -76,7 +76,7 @@ class DataSetCorrelate(
   override def toString: String = {
     val rexCall = scan.getCall.asInstanceOf[RexCall]
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    correlateToString(rexCall, sqlFunction)
+    correlateToString(joinRowType, rexCall, sqlFunction, getExpressionString)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -84,7 +84,11 @@ class DataSetCorrelate(
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
     super.explainTerms(pw)
       .item("invocation", scan.getCall)
-      .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
+      .item("correlate", correlateToString(
+        inputNode.getRowType,
+        rexCall, sqlFunction,
+        getExpressionString))
+      .item("select", selectToString(relRowType))
       .item("rowType", relRowType)
       .item("joinType", joinType)
       .itemIf("condition", condition.orNull, condition.isDefined)
@@ -103,8 +107,6 @@ class DataSetCorrelate(
     val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
     val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
-
     val flatMap = generateFunction(
       config,
       new RowSchema(getInput.getRowType),
@@ -131,6 +133,14 @@ class DataSetCorrelate(
       collector.code,
       flatMap.returnType)
 
-    inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
+    inputDS
+      .flatMap(mapFunc)
+      .name(correlateOpName(
+        inputNode.getRowType,
+        rexCall,
+        sqlFunction,
+        relRowType,
+        getExpressionString)
+      )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 18ab2a3..4c702ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -69,7 +69,7 @@ class DataStreamCorrelate(
   override def toString: String = {
     val rexCall = scan.getCall.asInstanceOf[RexCall]
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    correlateToString(rexCall, sqlFunction)
+    correlateToString(inputSchema.relDataType, rexCall, sqlFunction, getExpressionString)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -77,7 +77,11 @@ class DataStreamCorrelate(
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
     super.explainTerms(pw)
       .item("invocation", scan.getCall)
-      .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
+      .item("correlate", correlateToString(
+        inputSchema.relDataType,
+        rexCall, sqlFunction,
+        getExpressionString))
+      .item("select", selectToString(schema.relDataType))
       .item("rowType", schema.relDataType)
       .item("joinType", joinType)
       .itemIf("condition", condition.orNull, condition.isDefined)
@@ -130,7 +134,13 @@ class DataStreamCorrelate(
       .process(processFunc)
       // preserve input parallelism to ensure that acc and retract messages remain in order
       .setParallelism(inputParallelism)
-      .name(correlateOpName(rexCall, sqlFunction, schema.relDataType))
+      .name(correlateOpName(
+        inputSchema.relDataType,
+        rexCall,
+        sqlFunction,
+        schema.relDataType,
+        getExpressionString)
+      )
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
index 802fd85..23dfc03 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -84,6 +84,7 @@ class LogicalUnnestRule(
           // create table function
           val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunction(
             "explode",
+            "explode",
             ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
             FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
             cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory])

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
index 14f812a..61f43dc 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
@@ -110,6 +110,11 @@ public class JavaUserDefinedAggFunctions {
 				acc.sum += a.sum;
 			}
 		}
+
+		@Override
+		public String toString() {
+			return "myWeightedAvg";
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 59d2a47..486b078 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -238,7 +238,7 @@ class TableSourceTest extends TableTestBase {
         Array("name", "id", "amount", "price"),
         "'amount > 2"),
       term("select", "price", "id", "amount"),
-      term("where", s"<(${func.functionIdentifier}(amount), 32)")
+      term("where", s"<(${Func0.getClass.getSimpleName}(amount), 32)")
     )
 
     util.verifyTable(result, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
index a71f11c..6942a4e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
@@ -42,7 +42,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1($$cor0.c))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")
@@ -62,7 +63,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "func1($cor0.c, '$')"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1($$cor0.c, '$$'))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")
@@ -88,7 +90,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1($$cor0.c))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "LEFT")
@@ -114,7 +117,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
+        term("correlate", s"table(func2($$cor0.c))"),
+        term("select", "a", "b", "c", "f0", "f1"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
                "VARCHAR(65536) f0, INTEGER f1)"),
@@ -141,7 +145,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "hierarchy($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
+        term("correlate", s"table(hierarchy($$cor0.c))"),
+        term("select", "a", "b", "c", "f0", "f1", "f2"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
                " VARCHAR(65536) f0, BOOLEAN f1, INTEGER f2)"),
@@ -168,7 +173,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "pojo($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
+        term("correlate", s"table(pojo($$cor0.c))"),
+        term("select", "a", "b", "c", "age", "name"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
                " INTEGER age, VARCHAR(65536) name)"),
@@ -196,7 +202,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
+        term("correlate", s"table(func2($$cor0.c))"),
+        term("select", "a", "b", "c", "f0", "f1"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
                "VARCHAR(65536) f0, INTEGER f1)"),
@@ -224,7 +231,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1(SUBSTRING($$cor0.c, 2)))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")
@@ -250,7 +258,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "func1('hello', 'world', $cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1('hello', 'world', $$cor0.c))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")
@@ -272,7 +281,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", "func2('hello', 'world', $cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
+        term("correlate", s"table(func2('hello', 'world', $$cor0.c))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
index ee05547..ff6dcf1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -88,10 +88,10 @@ class CalcTest extends TableTestBase {
       "DataSetCalc",
       batchTableNode(0),
       term("select",
-        s"${giveMeCaseClass.functionIdentifier}().my AS _c0",
-        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c1",
-        s"${giveMeCaseClass.functionIdentifier}().my AS _c2",
-        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c3"
+        "giveMeCaseClass$().my AS _c0",
+        "giveMeCaseClass$().clazz AS _c1",
+        "giveMeCaseClass$().my AS _c2",
+        "giveMeCaseClass$().clazz AS _c3"
       )
     )
 
@@ -171,7 +171,7 @@ class CalcTest extends TableTestBase {
     val expected = unaryNode(
       "DataSetCalc",
       batchTableNode(0),
-      term("select", s"${MyHashCode.functionIdentifier}(c) AS _c0", "b")
+      term("select", "MyHashCode$(c) AS _c0", "b")
     )
 
     util.verifyTable(resultTable, expected)
@@ -283,7 +283,7 @@ class CalcTest extends TableTestBase {
           unaryNode(
             "DataSetCalc",
             batchTableNode(0),
-            term("select", "a", "c", s"${MyHashCode.functionIdentifier}(c) AS k")
+            term("select", "a", "c", "MyHashCode$(c) AS k")
           ),
           term("groupBy", "k"),
           term("select", "k", "SUM(a) AS TMP_0")

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
index 15f3def..0b48070 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.table.api.batch.table
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.runtime.utils._
 import org.apache.flink.table.utils.{TableFunc1, TableTestBase}
 import org.junit.Test
 
@@ -41,7 +40,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", s"${function.functionIdentifier}($$2)"),
-        term("function", function),
+        term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
+        term("select", "a", "b", "c", "s"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
         term("joinType", "INNER")
@@ -61,7 +61,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
-        term("function", function),
+        term("correlate", s"table(${function.getClass.getSimpleName}(c, '$$'))"),
+        term("select", "a", "b", "c", "s"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
         term("joinType", "INNER")
@@ -86,7 +87,8 @@ class CorrelateTest extends TableTestBase {
         "DataSetCorrelate",
         batchTableNode(0),
         term("invocation", s"${function.functionIdentifier}($$2)"),
-        term("function", function),
+        term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
+        term("select", "a", "b", "c", "s"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
         term("joinType", "LEFT")

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
index e441203..6a2f1a7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -71,7 +71,7 @@ class GroupWindowTest extends TableTestBase {
       batchTableNode(0),
       term("groupBy", "string"),
       term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+      term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
     )
 
     util.verifyTable(windowedTable, expected)
@@ -212,7 +212,7 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term("window",
            SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+      term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
     )
 
     util.verifyTable(windowedTable, expected)
@@ -310,7 +310,7 @@ class GroupWindowTest extends TableTestBase {
       batchTableNode(0),
       term("groupBy", "string"),
       term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+      term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
     )
 
     util.verifyTable(windowedTable, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
index 955ed4b..ec61816 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
@@ -42,7 +42,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1($$cor0.c))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")
@@ -62,7 +63,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "func1($cor0.c, '$')"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1($$cor0.c, '$$'))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")
@@ -88,7 +90,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1($$cor0.c))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "LEFT")
@@ -114,7 +117,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
+        term("correlate", s"table(func2($$cor0.c))"),
+        term("select", "a", "b", "c", "f0", "f1"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
                "VARCHAR(65536) f0, INTEGER f1)"),
@@ -141,7 +145,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "hierarchy($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
+        term("correlate", s"table(hierarchy($$cor0.c))"),
+        term("select", "a", "b", "c", "f0", "f1", "f2"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
                " VARCHAR(65536) f0, BOOLEAN f1, INTEGER f2)"),
@@ -168,7 +173,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "pojo($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
+        term("correlate", s"table(pojo($$cor0.c))"),
+        term("select", "a", "b", "c", "age", "name"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
                " INTEGER age, VARCHAR(65536) name)"),
@@ -196,7 +202,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
+        term("correlate", s"table(func2($$cor0.c))"),
+        term("select", "a", "b", "c", "f0", "f1"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
                "VARCHAR(65536) f0, INTEGER f1)"),
@@ -224,7 +231,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1(SUBSTRING($$cor0.c, 2)))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")
@@ -250,7 +258,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "func1('hello', 'world', $cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
+        term("correlate", s"table(func1('hello', 'world', $$cor0.c))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")
@@ -272,7 +281,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", "func2('hello', 'world', $cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
+        term("correlate", s"table(func2('hello', 'world', $$cor0.c))"),
+        term("select", "a", "b", "c", "f0"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
         term("joinType", "INNER")

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
index f15dea9..9d9d1db 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
@@ -19,8 +19,8 @@ package org.apache.flink.table.api.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func13
 import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.runtime.utils._
 import org.apache.flink.table.utils._
 import org.junit.Test
 
@@ -40,7 +40,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", s"${function.functionIdentifier}($$2)"),
-        term("function", function),
+        term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
+        term("select", "a", "b", "c", "s"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
         term("joinType", "INNER")
@@ -60,7 +61,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
-        term("function", function),
+        term("correlate", s"table(${function.getClass.getSimpleName}(c, '$$'))"),
+        term("select", "a", "b", "c", "s"),
         term("rowType",
              "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
         term("joinType", "INNER")
@@ -85,7 +87,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", s"${function.functionIdentifier}($$2)"),
-        term("function", function),
+        term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
+        term("select", "a", "b", "c", "s"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
         term("joinType", "LEFT")
@@ -101,16 +104,19 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = util.addFunction("func2", new TableFunc2)
+    val scalarFunc = new Func13("pre")
 
-    val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
+    val result = table.join(function(scalarFunc('c)) as ('name, 'len)).select('c, 'name, 'len)
 
     val expected = unaryNode(
       "DataStreamCalc",
       unaryNode(
         "DataStreamCorrelate",
         streamTableNode(0),
-        term("invocation", s"${function.functionIdentifier}($$2)"),
-        term("function", function),
+        term("invocation",
+             s"${function.functionIdentifier}(${scalarFunc.functionIdentifier}($$2))"),
+        term("correlate", s"table(${function.getClass.getSimpleName}(Func13(c)))"),
+        term("select", "a", "b", "c", "name", "len"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
            "VARCHAR(65536) name, INTEGER len)"),
@@ -134,7 +140,8 @@ class CorrelateTest extends TableTestBase {
       "DataStreamCorrelate",
       streamTableNode(0),
       term("invocation", s"${function.functionIdentifier}($$2)"),
-      term("function", function),
+      term("correlate", "table(HierarchyTableFunction(c))"),
+      term("select", "a", "b", "c", "name", "adult", "len"),
       term("rowType",
         "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
         " VARCHAR(65536) name, BOOLEAN adult, INTEGER len)"),
@@ -156,7 +163,8 @@ class CorrelateTest extends TableTestBase {
       "DataStreamCorrelate",
       streamTableNode(0),
       term("invocation", s"${function.functionIdentifier}($$2)"),
-      term("function", function),
+      term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
+      term("select", "a", "b", "c", "age", "name"),
       term("rowType",
         "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
          "INTEGER age, VARCHAR(65536) name)"),
@@ -183,7 +191,8 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation", s"${function.functionIdentifier}($$2)"),
-        term("function", function),
+        term("correlate", s"table(${function.getClass.getSimpleName}(c))"),
+        term("select", "a", "b", "c", "name", "len"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
           "VARCHAR(65536) name, INTEGER len)"),
@@ -208,7 +217,9 @@ class CorrelateTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation",  s"${function.functionIdentifier}(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"),
-        term("function", function),
+        term("correlate",
+             s"table(${function.getClass.getSimpleName}(SUBSTRING(c, 2, CHAR_LENGTH(c))))"),
+        term("select", "a", "b", "c", "s"),
         term("rowType",
           "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) s)"),
         term("joinType", "INNER")

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index 599c76b..260726b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -181,7 +181,7 @@ class GroupWindowTest extends TableTestBase {
           WindowReference("w"),
           'rowtime,
           5.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+      term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
     )
 
     util.verifyTable(windowedTable, expected)
@@ -319,7 +319,7 @@ class GroupWindowTest extends TableTestBase {
       streamTableNode(0),
       term("groupBy", "string"),
       term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+      term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
     )
 
     util.verifyTable(windowedTable, expected)
@@ -363,7 +363,7 @@ class GroupWindowTest extends TableTestBase {
       streamTableNode(0),
       term("groupBy", "string"),
       term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
-      term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
+      term("select", "string", "myWeightedAvg(long, int) AS TMP_0")
     )
 
     util.verifyTable(windowedTable, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
index 8b563a3..55e3ecb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.Func1
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.StreamTableTestUtil
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Test
 
@@ -65,7 +64,7 @@ class OverWindowTest extends TableTestBase {
                "WeightedAvgWithRetract(c, a) AS w0$o2")
         ),
         term("select",
-             s"${plusOne.functionIdentifier}(w0$$o0) AS d",
+             s"Func1$$(w0$$o0) AS d",
              "EXP(CAST(w0$o1)) AS _c1",
              "+(w0$o2, 1) AS _c2",
              "||('AVG:', CAST(w0$o2)) AS _c3",

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
index b4ad9ca..ce4de14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
@@ -491,7 +491,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
       "DataStreamCalc",
       streamTableNode(0),
       term("select", "a", "b", "c"),
-      term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
+      term("where", s"IS NULL(NonDeterministicNullFunc$$())")
     )
 
     util.verifyTable(result, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index cfff326..1714ec8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -160,7 +160,8 @@ class TimeIndicatorConversionTest extends TableTestBase {
         streamTableNode(0),
         term("invocation",
           s"${func.functionIdentifier}(CAST($$0):TIMESTAMP(3) NOT NULL, PROCTIME($$3), '')"),
-        term("function", func),
+        term("correlate", s"table(TableFunc(CAST(rowtime), PROCTIME(proctime), ''))"),
+        term("select", "rowtime", "long", "int", "proctime", "s"),
         term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, " +
           "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(65536) s)"),
         term("joinType", "INNER")

http://git-wip-us.apache.org/repos/asf/flink/blob/427dfe42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index c62349c..480d817 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -284,7 +284,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val func1 = new Func13("Sunny")
     val func2 = new Func13("kevin2")
 
-    val result = t.select(func0('c), func1('c),func2('c))
+    val result = t.select(func0('c), func1('c), func2('c))
 
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()


Mime
View raw message