beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Test GBK immediately followed by stateful ParDo
Date Fri, 14 Jul 2017 20:05:54 GMT
Repository: beam
Updated Branches:
  refs/heads/master e8c557448 -> 621f20f02


Test GBK immediately followed by stateful ParDo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d9fe885
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d9fe885
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d9fe885

Branch: refs/heads/master
Commit: 5d9fe885b2581a98aeb1d470229b733eda52d1cd
Parents: e8c5574
Author: Kenneth Knowles <klk@google.com>
Authored: Fri Jul 14 08:02:00 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Fri Jul 14 13:05:08 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 37 ++++++++++++++++++++
 1 file changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5d9fe885/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index fa4949e..142dff8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2596,6 +2596,43 @@ public class ParDoTest implements Serializable {
     pipeline.run();
   }
 
+  /**
+   * Tests a GBK followed immediately by a {@link ParDo} that users timers. This checks a
common
+   * case where both GBK and the user code share a timer delivery bundle.
+   */
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testGbkFollowedByUserTimers() throws Exception {
+
+    DoFn<KV<String, Iterable<Integer>>, Integer> fn =
+        new DoFn<KV<String, Iterable<Integer>>, Integer>() {
+
+          public static final String TIMER_ID = "foo";
+
+          @TimerId(TIMER_ID)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(TIMER_ID) Timer timer)
{
+            timer.offset(Duration.standardSeconds(1)).setRelative();
+            context.output(3);
+          }
+
+          @OnTimer(TIMER_ID)
+          public void onTimer(OnTimerContext context) {
+            context.output(42);
+          }
+        };
+
+    PCollection<Integer> output =
+        pipeline
+            .apply(Create.of(KV.of("hello", 37)))
+            .apply(GroupByKey.<String, Integer>create())
+            .apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(3, 42);
+    pipeline.run();
+  }
+
   @Test
   @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testEventTimeTimerAlignBounded() throws Exception {


Mime
View raw message