beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Use only WindowFn in TriggerTester
Date Fri, 24 Jun 2016 16:28:55 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master bf67d8edc -> bd21ead63


Use only WindowFn in TriggerTester

This change is preparatory for separating trigger syntax
from implementation.

Previously, the whole WindowingStrategy was passed in, but
not used. Since the tester is really a test of the state
machine, it will be moved to runners-core alongside the trigger
implementation. The requirement to provide a WindowingStrategy
with the original syntax is extraneous.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2cdc2be4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2cdc2be4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2cdc2be4

Branch: refs/heads/master
Commit: 2cdc2be45cd7722232b56e8fdef5670b33c337d8
Parents: 82ae661
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Jun 23 21:24:22 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Jun 23 21:30:08 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/ReduceFnRunner.java    |  3 ++-
 .../org/apache/beam/sdk/util/TriggerContextFactory.java  | 11 ++++++-----
 .../java/org/apache/beam/sdk/util/TriggerTester.java     |  2 +-
 3 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cdc2be4/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 864e8e7..2efc859 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -236,7 +236,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
{
     this.triggerRunner =
         new TriggerRunner<>(
             windowingStrategy.getTrigger(),
-            new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows));
+            new TriggerContextFactory<>(
+                windowingStrategy.getWindowFn(), stateInternals, activeWindows));
   }
 
   private ActiveWindowSet<W> createActiveWindowSet() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cdc2be4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
index 4855654..f7635d3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo;
 import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateAccessor;
@@ -50,19 +51,19 @@ import javax.annotation.Nullable;
  */
 public class TriggerContextFactory<W extends BoundedWindow> {
 
-  private final WindowingStrategy<?, W> windowingStrategy;
+  private final WindowFn<?, W> windowFn;
   private StateInternals<?> stateInternals;
   // Future triggers may be able to exploit the active window to state address window mapping.
   @SuppressWarnings("unused")
   private ActiveWindowSet<W> activeWindows;
   private final Coder<W> windowCoder;
 
-  public TriggerContextFactory(WindowingStrategy<?, W> windowingStrategy,
+  public TriggerContextFactory(WindowFn<?, W> windowFn,
       StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
-    this.windowingStrategy = windowingStrategy;
+    this.windowFn = windowFn;
     this.stateInternals = stateInternals;
     this.activeWindows = activeWindows;
-    this.windowCoder = windowingStrategy.getWindowFn().windowCoder();
+    this.windowCoder = windowFn.windowCoder();
   }
 
   public Trigger.TriggerContext base(W window, Timers timers,
@@ -106,7 +107,7 @@ public class TriggerContextFactory<W extends BoundedWindow> {
 
     @Override
     public boolean isMerging() {
-      return !windowingStrategy.getWindowFn().isNonMerging();
+      return !windowFn.isNonMerging();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2cdc2be4/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
index c495712..5af9ae9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -160,7 +160,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
     this.windowToMergeResult = new HashMap<>();
 
     this.contextFactory =
-        new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows);
+        new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals,
activeWindows);
   }
 
   /**


Mime
View raw message