flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject flink git commit: [FLINK-6138] [table] Create the ListStateDescriptor with the aggregationStateType instead of a serializer.
Date Tue, 21 Mar 2017 12:59:50 GMT
Repository: flink
Updated Branches:
  refs/heads/master 17dd915e8 -> e14135518


[FLINK-6138] [table] Create the ListStateDescriptor with the aggregationStateType instead
of a serializer.

this closes #3581


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1413551
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1413551
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1413551

Branch: refs/heads/master
Commit: e14135518d51e6b491f2cd512234b71f1cf1d716
Parents: 17dd915
Author: 金竹 <jincheng.sunjc@alibaba-inc.com>
Authored: Tue Mar 21 12:50:02 2017 +0800
Committer: Jark Wu <wuchong.wc@alibaba-inc.com>
Committed: Tue Mar 21 20:58:16 2017 +0800

----------------------------------------------------------------------
 .../UnboundedNonPartitionedProcessingOverProcessFunction.scala   | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1413551/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
index 51c8315..7750511 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
@@ -98,9 +98,7 @@ class UnboundedNonPartitionedProcessingOverProcessFunction(
   }
 
   override def initializeState(context: FunctionInitializationContext): Unit = {
-    val stateSerializer =
-      aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig)
-    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", stateSerializer)
+    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType)
     state = context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor)
   }
 }


Mime
View raw message