beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/6] incubator-beam git commit: Only remove window from active window set if it is still active
Date Thu, 10 Mar 2016 23:08:57 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master b2b5f429f -> de91b8014


Only remove window from active window set if it is still active


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c415be87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c415be87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c415be87

Branch: refs/heads/master
Commit: c415be870d03fd9491982cc8e1100165e5c8323c
Parents: 0442a24
Author: Mark Shields <markshields@google.com>
Authored: Wed Mar 9 14:02:35 2016 -0800
Committer: Mark Shields <markshields@google.com>
Committed: Wed Mar 9 14:06:36 2016 -0800

----------------------------------------------------------------------
 .../dataflow/sdk/util/MergingActiveWindowSet.java      | 13 +++++--------
 .../google/cloud/dataflow/sdk/util/ReduceFnRunner.java |  9 ++++++---
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c415be87/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
index 95e378d..5af4ea5 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
@@ -72,9 +72,7 @@ import javax.annotation.Nullable;
  */
 public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W>
{
   private final WindowFn<Object, W> windowFn;
-
-  @Nullable
-  private Map<W, Set<W>> activeWindowToStateAddressWindows;
+  private final Map<W, Set<W>> activeWindowToStateAddressWindows;
 
   /**
    * As above, but only for EPHEMERAL windows. Does not need to be persisted.
@@ -94,16 +92,14 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements
ActiveWi
    * MERGED. Otherwise W1 is EPHEMERAL.
    * </ul>
    */
-  @Nullable
-  private Map<W, W> windowToActiveWindow;
+  private final Map<W, W> windowToActiveWindow;
 
   /**
    * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
    *
    * <p>Used to avoid writing to state if no changes have been made during the work
unit.
    */
-  @Nullable
-  private Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
+  private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
 
   /**
    * Handle representing our state in the backend.
@@ -195,6 +191,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements
ActiveWi
 
   @Override
   public void remove(W window) {
+    Preconditions.checkState(isActive(window), "Window %s is not active", window);
     for (W stateAddressWindow : activeWindowToStateAddressWindows.get(window)) {
       windowToActiveWindow.remove(stateAddressWindow);
     }
@@ -522,7 +519,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements
ActiveWi
   private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>>
multimap) {
     Map<W, Set<W>> newMultimap = new HashMap<>();
     for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
-      newMultimap.put(entry.getKey(), new LinkedHashSet<W>(entry.getValue()));
+      newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
     }
     return newMultimap;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c415be87/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 1a009bb..2b6e0d4 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -523,7 +523,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     // - The trigger may implement isClosed as constant false.
     // - If the window function does not support windowing then all windows will be considered
     // active.
-    // So we must combine the above.
+    // So we must take conjunction of activeWindows and triggerRunner state.
     boolean windowIsActive =
         activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
 
@@ -602,7 +602,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       boolean windowIsActive)
           throws Exception {
     if (windowIsActive) {
-      // Since window is still active the trigger has not closed.
+      // Since window was still active the trigger may not have closed.
       reduceFn.clearState(renamedContext);
       watermarkHold.clearHolds(renamedContext);
       nonEmptyPanes.clearPane(renamedContext.state());
@@ -622,7 +622,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
       }
     }
     paneInfoTracker.clear(directContext.state());
-    activeWindows.remove(directContext.window());
+    if (activeWindows.isActive(directContext.window())) {
+      // Don't need to track address state windows anymore
+      activeWindows.remove(directContext.window());
+    }
     // We'll never need to test for the trigger being closed again.
     triggerRunner.clearFinished(directContext.state());
   }


Mime
View raw message