beam-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (Jira)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-10887) FnApiTimer should expose clear() API
Date Tue, 29 Jun 2021 16:41:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-10887?focusedWorklogId=616600&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-616600
]

ASF GitHub Bot logged work on BEAM-10887:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Jun/21 16:40
            Start Date: 29/Jun/21 16:40
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r660780333



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */
   void setRelative();
 
+  /** Clears a timer. */

Review comment:
       ```suggestion
     /** Previously set timers will become unset. */
   ```

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -670,7 +670,8 @@ private synchronized void updateTimers(TimerUpdate update) {
             existingTimersForKey.get(
                 deletedTimer.getNamespace(),
                 deletedTimer.getTimerId() + '+' + deletedTimer.getTimerFamilyId());
-
+        System.err.println(

Review comment:
       drop debugging statement

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4851,342 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+

Review comment:
       Should we cover the case where the timer becomes eligible and still fires even though
it is being cleared in the same bundle?
   
   e.g.
   set timer A for 1, set timer B for 2, advance time to 3, have timer A callback clear B,
B still fires since it is part of the same bundle

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -173,48 +174,73 @@ public void processElement(WindowedValue<KeyedWorkItem<K, KV<K,
InputT>>> gbkRes
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable())
{
         delegateEvaluator.processElement(windowedValue);
       }
-
-      final Instant inputWatermarkTime = timerInternals.currentInputWatermarkTime();
       PriorityQueue<TimerData> toBeFiredTimers =
           new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
-      gbkResult.getValue().timersIterable().forEach(toBeFiredTimers::add);
 
-      while (!timerInternals.containsUpdateForTimeBefore(inputWatermarkTime)
+      Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      for (TimerData timerData : gbkResult.getValue().timersIterable()) {
+        toBeFiredTimers.add(timerData);
+        switch (timerData.getDomain()) {
+          case EVENT_TIME:
+            maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, timerData.getTimestamp());
+            break;
+          case PROCESSING_TIME:
+            maxProcessingTime = Ordering.natural().max(maxProcessingTime, timerData.getTimestamp());
+            break;
+          case SYNCHRONIZED_PROCESSING_TIME:
+            maxSynchronizedProcessingTime =
+                Ordering.natural().max(maxSynchronizedProcessingTime, timerData.getTimestamp());
+        }
+      }
+
+      while (!timerInternals.containsUpdateForTimeBefore(
+              maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime)
           && !toBeFiredTimers.isEmpty()) {
+
         TimerData timer = toBeFiredTimers.poll();
         checkState(
             timer.getNamespace() instanceof WindowNamespace,
-            "Expected Timer %s to be in a %s, but got %s",
+            "Expected Timer %s to be in a        %s, but got %s",

Review comment:
       ```suggestion
               "Expected Timer %s to be in a %s, but got %s",
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       We should add documentation to Timer saying that set and/or clear calls may only become
visible after this bundle completes and may not be applied immediately allowing for existing
timers which have become eligible to still fire.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 616600)
    Time Spent: 3h 20m  (was: 3h 10m)

> FnApiTimer should expose clear() API
> ------------------------------------
>
>                 Key: BEAM-10887
>                 URL: https://issues.apache.org/jira/browse/BEAM-10887
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model, sdk-java-core
>            Reporter: Boyuan Zhang
>            Priority: P3
>              Labels: Clarified
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Similar to python OutputTimer.clear():https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L661,
Java FnApiTimer should also expose clear() API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message