flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/5] flink git commit: [FLINK-6859] [table] Do not delete timers in StateCleaningCountTrigger.
Date Mon, 19 Jun 2017 22:18:48 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 e96238406 -> b6d14b914


[FLINK-6859] [table] Do not delete timers in StateCleaningCountTrigger.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78b5092d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78b5092d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78b5092d

Branch: refs/heads/release-1.3
Commit: 78b5092dc82fe36412e1d47c1a1fd81ef821d7c6
Parents: e962384
Author: sunjincheng121 <sunjincheng121@gmail.com>
Authored: Wed Jun 7 19:46:32 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Jun 19 16:20:40 2017 +0200

----------------------------------------------------------------------
 .../runtime/triggers/StateCleaningCountTrigger.scala |  4 ----
 .../StateCleaningCountTriggerHarnessTest.scala       | 15 ++++++++++-----
 2 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78b5092d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
index f3f9246..3c18449 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
@@ -72,10 +72,6 @@ class StateCleaningCountTrigger(queryConfig: StreamQueryConfig, maxCount:
Long)
         // register timer and remember clean-up time
         ctx.registerProcessingTimeTimer(cleanupTime)
 
-        if (null != curCleanupTime) {
-          ctx.deleteProcessingTimeTimer(curCleanupTime)
-        }
-
         ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime)
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/78b5092d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
index 96601fb..93b89ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
@@ -62,7 +62,7 @@ class StateCleaningCountTriggerHarnessTest {
 
     assertEquals(0, testHarness.numStateEntries)
 
-    // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001
+    // 3001 + 2000 >= 3001 register cleanup timer with 6001
     assertEquals(
       TriggerResult.CONTINUE,
       testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
@@ -70,7 +70,7 @@ class StateCleaningCountTriggerHarnessTest {
     // try to trigger onProcessingTime method via 4002, but there is non timer is triggered
     assertEquals(0, testHarness.advanceProcessingTime(4002).size())
 
-    // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001
+    // 4002 + 2000 >= 6001 register cleanup timer via 7002
     assertEquals(
       TriggerResult.CONTINUE,
       testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
@@ -80,8 +80,8 @@ class StateCleaningCountTriggerHarnessTest {
       TriggerResult.CONTINUE,
       testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
 
-    // have one timer 7002
-    assertEquals(1, testHarness.numProcessingTimeTimers)
+    // have two timers 6001 and 7002
+    assertEquals(2, testHarness.numProcessingTimeTimers)
     assertEquals(0, testHarness.numEventTimeTimers)
     assertEquals(2, testHarness.numStateEntries)
     assertEquals(2, testHarness.numStateEntries(GlobalWindow.get))
@@ -115,9 +115,14 @@ class StateCleaningCountTriggerHarnessTest {
     assertEquals(1, testHarness.numStateEntries(GlobalWindow.get))
 
     // try to trigger onProcessingTime method via 7002, and all states are cleared
+    val timesIt = testHarness.advanceProcessingTime(7002).iterator()
+    assertEquals(
+      TriggerResult.CONTINUE,
+      timesIt.next().f1)
+
     assertEquals(
       TriggerResult.FIRE_AND_PURGE,
-      testHarness.advanceProcessingTime(7002).iterator().next().f1)
+      timesIt.next().f1)
 
     assertEquals(0, testHarness.numStateEntries)
   }


Mime
View raw message