beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] beam git commit: Fix Output Windows in OnTimerContext
Date Wed, 22 Mar 2017 01:04:39 GMT
Repository: beam
Updated Branches:
  refs/heads/master 7e97820c5 -> e1dc7a861


Fix Output Windows in OnTimerContext

When a User timer is delivered, output elements produced by that timer
firing should be placed within the same window as the timer is in.

Deliver timers in the window of thier namespace in the DirectRunner.

Test that timer deliveries maintain the window the timer was emitted in.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/75100f8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/75100f8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/75100f8b

Branch: refs/heads/master
Commit: 75100f8bddb957522b4ce1a9e3cc2a4d60b2527c
Parents: 7e97820
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Mar 21 11:26:09 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Tue Mar 21 18:03:51 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 13 ++++--
 .../direct/StatefulParDoEvaluatorFactory.java   | 17 +++++---
 .../apache/beam/sdk/transforms/ParDoTest.java   | 45 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 2b93ca0..f5a559c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -766,22 +767,26 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
 
     @Override
     public void output(OutputT output) {
-      context.outputWithTimestamp(output, timestamp);
+      context.outputWindowedValue(
+          output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
+      context.outputWindowedValue(
+          output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 0ad40ac..77bebb2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -228,15 +230,20 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT>
implements Transfo
     @Override
     public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>>
gbkResult)
         throws Exception {
-
-      BoundedWindow window = Iterables.getOnlyElement(gbkResult.getWindows());
-
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable())
{
         delegateEvaluator.processElement(windowedValue);
       }
 
       for (TimerData timer : gbkResult.getValue().timersIterable()) {
-        delegateEvaluator.onTimer(timer, window);
+        checkState(
+            timer.getNamespace() instanceof WindowNamespace,
+            "Expected Timer %s to be in a %s, but got %s",
+            timer,
+            WindowNamespace.class.getSimpleName(),
+            timer.getNamespace().getClass().getName());
+        WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
+        BoundedWindow timerWindow = windowNamespace.getWindow();
+        delegateEvaluator.onTimer(timer, timerWindow);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/75100f8b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index e58f78e..d5786f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -101,6 +101,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -1952,6 +1953,50 @@ public class ParDoTest implements Serializable {
     pipeline.run();
   }
 
+  @Test
+  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  public void testTimerReceivedInOriginalWindow() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, BoundedWindow> fn =
+        new DoFn<KV<String, Integer>, BoundedWindow>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
+            timer.setForNowPlus(Duration.standardSeconds(1));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context, BoundedWindow window) {
+            context.output(context.window());
+          }
+
+          public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() {
+            return (TypeDescriptor) TypeDescriptor.of(IntervalWindow.class);
+          }
+        };
+
+    SlidingWindows windowing =
+        SlidingWindows.of(Duration.standardMinutes(3)).every(Duration.standardMinutes(1));
+    PCollection<BoundedWindow> output =
+        pipeline
+            .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 24), new Instant(0L))))
+            .apply(Window.<KV<String, Integer>>into(windowing))
+            .apply(ParDo.of(fn));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            new IntervalWindow(new Instant(0), Duration.standardMinutes(3)),
+            new IntervalWindow(
+                new Instant(0).minus(Duration.standardMinutes(1)), Duration.standardMinutes(3)),
+            new IntervalWindow(
+                new Instant(0).minus(Duration.standardMinutes(2)), Duration.standardMinutes(3)));
+    pipeline.run();
+  }
+
   /**
    * Tests that an event time timer set absolutely for the last possible moment fires and
results in
    * supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}.


Mime
View raw message