beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] beam git commit: [BEAM-1727] Add align and offset to Timer
Date Thu, 04 May 2017 20:27:51 GMT
[BEAM-1727] Add align and offset to Timer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f934923
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f934923
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f934923

Branch: refs/heads/master
Commit: 4f934923d28798dfe7cd18c86ff4bcf8eebc27e5
Parents: e2aa889
Author: JingsongLi <lzljs3620320@aliyun.com>
Authored: Mon Mar 20 12:12:31 2017 +0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu May 4 13:27:05 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformMatchersTest.java    |   2 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |  44 +++++++-
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   2 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |   2 +-
 .../java/org/apache/beam/sdk/util/Timer.java    |  27 +++--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 113 ++++++++++++++++++-
 6 files changed, 171 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index e7d4c64..bb1b1cd 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -193,7 +193,7 @@ public class PTransformMatchersTest implements Serializable {
 
         @ProcessElement
         public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
-          timer.setForNowPlus(Duration.standardSeconds(1));
+          timer.offset(Duration.standardSeconds(1)).setRelative();
           context.output(3);
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 8a3e25f..7f29a6f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -788,6 +788,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
     private final StateNamespace namespace;
     private final String timerId;
     private final TimerSpec spec;
+    private Duration period = Duration.ZERO;
+    private Duration offset = Duration.ZERO;
 
     public TimerInternalsTimer(
         BoundedWindow window,
@@ -812,12 +814,45 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
     }
 
     @Override
-    public void setForNowPlus(Duration durationFromNow) {
-      Instant target = getCurrentTime().plus(durationFromNow);
-      verifyTargetTime(target);
+    public void setRelative() {
+      Instant target;
+      Instant now = getCurrentTime();
+      if (period.equals(Duration.ZERO)) {
+        target = now.plus(offset);
+      } else {
+        long millisSinceStart = now.plus(offset).getMillis() % period.getMillis();
+        target = millisSinceStart == 0 ? now : now.plus(period).minus(millisSinceStart);
+      }
+      target = minTargetAndGcTime(target);
       setUnderlyingTimer(target);
     }
 
+    @Override
+    public Timer offset(Duration offset) {
+      this.offset = offset;
+      return this;
+    }
+
+    @Override
+    public Timer align(Duration period) {
+      this.period = period;
+      return this;
+    }
+
+    /**
+     * For event time timers the target time should be prior to window GC time. So it return
+     * min(time to set, GC Time of window).
+     */
+    private Instant minTargetAndGcTime(Instant target) {
+      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+        if (target.isAfter(windowExpiry)) {
+          return windowExpiry;
+        }
+      }
+      return target;
+    }
+
     /**
      * Ensures that the target time is reasonable. For event time timers this means that
the
      * time should be prior to window GC time.
@@ -836,7 +871,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
       if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
         throw new IllegalStateException(
             "Cannot only set relative timers in processing time domain."
-                + " Use #setForNowPlus(Duration)");
+                + " Use #setRelative()");
       }
     }
 
@@ -867,5 +902,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
               String.format("Timer created for unknown time domain %s", spec.getTimeDomain()));
       }
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 3e404ad..9b63bab 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -388,7 +388,7 @@ public class SimpleDoFnRunnerTest {
 
     @ProcessElement
     public void process(ProcessContext context, @TimerId(TIMER_ID) Timer timer) {
-      timer.setForNowPlus(TIMER_OFFSET);
+      timer.offset(TIMER_OFFSET).setRelative();
     }
 
     @OnTimer(TIMER_ID)

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 0368476..eab08f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -385,7 +385,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable,
HasDisplayD
    *    public void processElement(
    *        ProcessContext c,
    *       {@literal @TimerId("my-timer-id") Timer myTimer}) {
-   *      myTimer.setForNowPlus(Duration.standardSeconds(...));
+   *      myTimer.offset(Duration.standardSeconds(...)).setRelative();
    *    }
    *
    *   {@literal @OnTimer("my-timer-id")}

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
index 45a2a66..9727969 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
@@ -49,19 +49,30 @@ public interface Timer {
    *
    * <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since
processing
    * time timers are ignored after a window has expired. Instead, it is recommended to use
-   * {@link #setForNowPlus(Duration)}.
+   * {@link #setRelative()}.
    */
-  void set(Instant instant);
-
-  /**
-   * Sets or resets the time relative to the current time in the timer's {@link TimeDomain}
at which
-   * this it should fire. If the timer was already set, resets it to the new requested time.
-   */
-  void setForNowPlus(Duration durationFromNow);
+  void set(Instant absoluteTime);
 
   /**
    * Unsets this timer. It is permitted to {@code cancel()} whether or not the timer was
actually
    * set.
    */
   void cancel();
+
+  /**
+   * Sets the timer relative to the current time, according to any offset and alignment specified.
+   * Using {@link #offset(Duration)} and {@link #align(Duration)}.
+   */
+  void setRelative();
+
+  /**
+   * Set the align offset.
+   */
+  Timer offset(Duration offset);
+
+  /**
+   * Aligns a timestamp to the next boundary of {@code period}.
+   */
+  Timer align(Duration period);
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d4475c9..1c919d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2582,7 +2582,7 @@ public class ParDoTest implements Serializable {
 
           @ProcessElement
           public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
-            timer.setForNowPlus(Duration.standardSeconds(1));
+            timer.offset(Duration.standardSeconds(1)).setRelative();
             context.output(3);
           }
 
@@ -2599,6 +2599,36 @@ public class ParDoTest implements Serializable {
 
   @Test
   @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testEventTimeTimerAlignBounded() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+        new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
+            timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
+            context.output(KV.of(3, context.timestamp()));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(KV.of(42, context.timestamp()));
+          }
+        };
+
+    PCollection<KV<Integer, Instant>> output =
+        pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(KV.of(3, BoundedWindow.TIMESTAMP_MIN_VALUE),
+        KV.of(42, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1774)));
+    pipeline.run();
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testTimerReceivedInOriginalWindow() throws Exception {
     final String timerId = "foo";
 
@@ -2610,7 +2640,7 @@ public class ParDoTest implements Serializable {
 
           @ProcessElement
           public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
-            timer.setForNowPlus(Duration.standardSeconds(1));
+            timer.offset(Duration.standardSeconds(1)).setRelative();
           }
 
           @OnTimer(timerId)
@@ -2814,7 +2844,7 @@ public class ParDoTest implements Serializable {
 
           @ProcessElement
           public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
-            timer.setForNowPlus(Duration.standardSeconds(1));
+            timer.offset(Duration.standardSeconds(1)).setRelative();
             context.output(3);
           }
 
@@ -2848,7 +2878,7 @@ public class ParDoTest implements Serializable {
 
           @ProcessElement
           public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
-            timer.setForNowPlus(Duration.standardSeconds(1));
+            timer.offset(Duration.standardSeconds(1)).setRelative();
             context.output(3);
           }
 
@@ -2871,6 +2901,81 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+  public void testEventTimeTimerAlignUnbounded() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+        new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
+            timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
+            context.output(KV.of(3, context.timestamp()));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(KV.of(42, context.timestamp()));
+          }
+        };
+
+    TestStream<KV<String, Integer>> stream = TestStream.create(KvCoder
+        .of(StringUtf8Coder.of(), VarIntCoder.of()))
+        .advanceWatermarkTo(new Instant(5))
+        .addElements(KV.of("hello", 37))
+        .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1).plus(1)))
+        .advanceWatermarkToInfinity();
+
+    PCollection<KV<Integer, Instant>> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(KV.of(3, new Instant(5)),
+        KV.of(42, new Instant(Duration.standardSeconds(1).minus(1).getMillis())));
+    pipeline.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+  public void testEventTimeTimerAlignAfterGcTimeUnbounded() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+        new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer)
{
+            // This aligned time will exceed the END_OF_GLOBAL_WINDOW
+            timer.align(Duration.standardDays(1)).setRelative();
+            context.output(KV.of(3, context.timestamp()));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(KV.of(42, context.timestamp()));
+          }
+        };
+
+    TestStream<KV<String, Integer>> stream = TestStream.create(KvCoder
+        .of(StringUtf8Coder.of(), VarIntCoder.of()))
+        // See GlobalWindow,
+        // END_OF_GLOBAL_WINDOW is TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))
+        .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)))
+        .addElements(KV.of("hello", 37))
+        .advanceWatermarkToInfinity();
+
+    PCollection<KV<Integer, Instant>> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(
+        KV.of(3, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))),
+        KV.of(42, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))));
+    pipeline.run();
+  }
+
+  @Test
   public void testWithOutputTagsDisplayData() {
     DoFn<String, String> fn = new DoFn<String, String>() {
       @ProcessElement


Mime
View raw message