flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/2] flink git commit: [FLINK-7298] [table] Improve state clean-up of proctime window join.
Date Mon, 07 Aug 2017 16:10:38 GMT
[FLINK-7298] [table] Improve state clean-up of proctime window join.

This closes #4421.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c9642f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c9642f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c9642f7

Branch: refs/heads/master
Commit: 8c9642f7d8c67251c6d59bba3a02661d8f537331
Parents: 6ea9dbb
Author: 军长 <hequn.chq@alibaba-inc.com>
Authored: Mon Jul 31 11:14:06 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Aug 7 16:24:28 2017 +0200

----------------------------------------------------------------------
 .../runtime/join/ProcTimeWindowInnerJoin.scala     | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c9642f7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
index 8a3ba43..e62a18f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
@@ -324,17 +324,18 @@ class ProcTimeWindowInnerJoin(
       }
     }
 
-    // Remove expired records from state
-    var i = removeList.size - 1
-    while (i >= 0) {
-      rowMapState.remove(removeList.get(i))
-      i -= 1
-    }
-    removeList.clear()
-
     // If the state has non-expired timestamps, register a new timer.
     // Otherwise clean the complete state for this input.
     if (validTimestamp) {
+
+      // Remove expired records from state
+      var i = removeList.size - 1
+      while (i >= 0) {
+        rowMapState.remove(removeList.get(i))
+        i -= 1
+      }
+      removeList.clear()
+
       val cleanupTime = curTime + winSize + 1
       ctx.timerService.registerProcessingTimeTimer(cleanupTime)
       timerState.update(cleanupTime)


Mime
View raw message