flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] flink git commit: [FLINK-6887] [table] Split up CodeGenerator into several specific CodeGenerator
Date Mon, 17 Jul 2017 10:12:27 GMT
Repository: flink
Updated Branches:
  refs/heads/master ef0653aa8 -> 527e7499c


http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 82f0051..c9f98e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin
 import org.apache.flink.table.api.{StreamQueryConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.{AggregationCodeGenerator, CodeGenerator}
 import org.apache.flink.table.expressions.ExpressionUtils.isTimeIntervalLiteral
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions._
@@ -71,7 +71,7 @@ object AggregateUtil {
     * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
     */
   private[flink] def createUnboundedOverProcessFunction(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
       inputTypeInfo: TypeInformation[Row],
@@ -150,7 +150,7 @@ object AggregateUtil {
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
   private[flink] def createGroupAggregateFunction(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputRowType: RelDataType,
       inputFieldTypes: Seq[TypeInformation[_]],
@@ -211,7 +211,7 @@ object AggregateUtil {
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
   private[flink] def createBoundedOverProcessFunction(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
       inputTypeInfo: TypeInformation[Row],
@@ -312,7 +312,7 @@ object AggregateUtil {
     * NOTE: this function is only used for time based window on batch tables.
     */
   def createDataSetWindowPrepareMapFunction(
-    generator: CodeGenerator,
+    generator: AggregationCodeGenerator,
     window: LogicalWindow,
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
     groupings: Array[Int],
@@ -418,7 +418,7 @@ object AggregateUtil {
     * NOTE: this function is only used for sliding windows with partial aggregates on batch
tables.
     */
   def createDataSetSlideWindowPrepareGroupReduceFunction(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       window: LogicalWindow,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       groupings: Array[Int],
@@ -530,7 +530,7 @@ object AggregateUtil {
     * NOTE: this function is only used for window on batch tables.
     */
   def createDataSetWindowAggregationGroupReduceFunction(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       window: LogicalWindow,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       physicalInputRowType: RelDataType,
@@ -681,7 +681,7 @@ object AggregateUtil {
     *
     */
   def createDataSetWindowAggregationMapPartitionFunction(
-    generator: CodeGenerator,
+    generator: AggregationCodeGenerator,
     window: LogicalWindow,
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
     physicalInputRowType: RelDataType,
@@ -754,7 +754,7 @@ object AggregateUtil {
     *
     */
   private[flink] def createDataSetWindowAggregationCombineFunction(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       window: LogicalWindow,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       physicalInputRowType: RelDataType,
@@ -819,7 +819,7 @@ object AggregateUtil {
     * respective output type are generated as well.
     */
   private[flink] def createDataSetAggregateFunctions(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
       inputFieldTypeInfo: Seq[TypeInformation[_]],
@@ -992,7 +992,7 @@ object AggregateUtil {
   }
 
   private[flink] def createDataStreamAggregateFunction(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
       inputFieldTypeInfo: Seq[TypeInformation[_]],

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index e3e292e..5f274db 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -40,7 +40,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.Path
 import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkPlannerImpl
-import org.apache.flink.table.codegen.{CodeGenerator, Compiler, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, Compiler, GeneratedFunction}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -115,7 +115,7 @@ abstract class ExpressionTestBase {
   def evaluateExprs() = {
     val relBuilder = context._1
     val config = new TableConfig()
-    val generator = new CodeGenerator(config, false, typeInfo)
+    val generator = new FunctionCodeGenerator(config, false, typeInfo)
 
     // cast expressions to String
     val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR))


Mime
View raw message