beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/3] beam git commit: [BEAM-1116] Support for new Timer API in Flink runner (streaming)
Date Tue, 21 Feb 2017 10:05:06 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8cfb3d125 -> 038ebd3bf


[BEAM-1116] Support for new Timer API in Flink runner (streaming)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f86facb2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f86facb2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f86facb2

Branch: refs/heads/master
Commit: f86facb27170c34cc9ef2d702a51bdcd7e53836d
Parents: 0e9173d
Author: JingsongLi <lzljs3620320@aliyun.com>
Authored: Fri Feb 17 17:48:23 2017 +0800
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Feb 21 09:53:05 2017 +0100

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |  1 -
 .../FlinkStreamingTransformTranslators.java     | 16 ------
 .../wrappers/streaming/DoFnOperator.java        | 58 +++++++++++++++-----
 .../wrappers/streaming/WindowDoFnOperator.java  | 48 ----------------
 4 files changed, 44 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f86facb2/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index fbe2686..a7fae5d 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -90,7 +90,6 @@
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.UsesSetState,
                     org.apache.beam.sdk.testing.UsesMapState,
-                    org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics

http://git-wip-us.apache.org/repos/asf/beam/blob/f86facb2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 757cdd2..cd0ef03 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -320,20 +320,6 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static void rejectTimers(DoFn<?, ?> doFn) {
-    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-
-    if (signature.timerDeclarations().size() > 0) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
-              DoFn.TimerId.class.getSimpleName(),
-              doFn.getClass().getName(),
-              DoFn.class.getSimpleName(),
-              FlinkRunner.class.getSimpleName()));
-    }
-  }
-
   private static class ParDoBoundStreamingTranslator<InputT, OutputT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         ParDo.Bound<InputT, OutputT>> {
@@ -345,7 +331,6 @@ public class FlinkStreamingTransformTranslators {
 
       DoFn<InputT, OutputT> doFn = transform.getFn();
       rejectSplittable(doFn);
-      rejectTimers(doFn);
 
       WindowingStrategy<?, ?> windowingStrategy =
           context.getOutput(transform).getWindowingStrategy();
@@ -531,7 +516,6 @@ public class FlinkStreamingTransformTranslators {
 
       DoFn<InputT, OutputT> doFn = transform.getFn();
       rejectSplittable(doFn);
-      rejectTimers(doFn);
 
       // we assume that the transformation does not change the windowing strategy.
       WindowingStrategy<?, ?> windowingStrategy =

http://git-wip-us.apache.org/repos/asf/beam/blob/f86facb2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8a3dad2..29b6fbc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -29,7 +29,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
@@ -41,6 +40,7 @@ import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals;
@@ -179,7 +179,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
         TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
   }
 
-  protected ExecutionContext.StepContext createStepContext() {
+  private ExecutionContext.StepContext createStepContext() {
     return new StepContext();
   }
 
@@ -306,7 +306,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   protected final long getPushbackWatermarkHold() {
     // if we don't have side inputs we never hold the watermark
     if (sideInputs.isEmpty()) {
-      return Long.MAX_VALUE;
+      return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
     }
 
     try {
@@ -325,7 +325,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       BagState<WindowedValue<InputT>> pushedBack =
           pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
-      long min = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE);
+      long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
       for (WindowedValue<InputT> value : pushedBack.read()) {
         min = Math.min(min, value.getTimestamp().getMillis());
       }
@@ -398,7 +398,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     }
 
     pushedBack.clear();
-    long min = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE);
+    long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
     for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
       min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
       pushedBack.add(pushedBackValue);
@@ -418,12 +418,36 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   @Override
   public void processWatermark1(Watermark mark) throws Exception {
-    this.currentInputWatermark = mark.getTimestamp();
-    long potentialOutputWatermark =
-        Math.min(getPushbackWatermarkHold(), currentInputWatermark);
-    if (potentialOutputWatermark > currentOutputWatermark) {
-      currentOutputWatermark = potentialOutputWatermark;
-      output.emitWatermark(new Watermark(currentOutputWatermark));
+    if (keyCoder == null) {
+      this.currentInputWatermark = mark.getTimestamp();
+      long potentialOutputWatermark =
+          Math.min(getPushbackWatermarkHold(), currentInputWatermark);
+      if (potentialOutputWatermark > currentOutputWatermark) {
+        currentOutputWatermark = potentialOutputWatermark;
+        output.emitWatermark(new Watermark(currentOutputWatermark));
+      }
+    } else {
+      // fireTimers, so we need startBundle.
+      pushbackDoFnRunner.startBundle();
+
+      this.currentInputWatermark = mark.getTimestamp();
+
+      // hold back by the pushed back values waiting for side inputs
+      long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
+
+      timerService.advanceWatermark(actualInputWatermark);
+
+      Instant watermarkHold = stateInternals.watermarkHold();
+
+      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
+
+      long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
+
+      if (potentialOutputWatermark > currentOutputWatermark) {
+        currentOutputWatermark = potentialOutputWatermark;
+        output.emitWatermark(new Watermark(currentOutputWatermark));
+      }
+      pushbackDoFnRunner.finishBundle();
     }
   }
 
@@ -538,9 +562,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     fireTimer(timer);
   }
 
+  // allow overriding this in WindowDoFnOperator
   public void fireTimer(InternalTimer<?, TimerData> timer) {
-    // Now not implement timers in StatefulPardo
-    throw new RuntimeException("The fireTimer should not be invoke in DoFnOperator.");
+    TimerInternals.TimerData timerData = timer.getNamespace();
+    StateNamespace namespace = timerData.getNamespace();
+    // This is a user timer, so namespace must be WindowNamespace
+    checkArgument(namespace instanceof WindowNamespace);
+    BoundedWindow window = ((WindowNamespace) namespace).getWindow();
+    pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
+        timerData.getTimestamp(), timerData.getDomain());
   }
 
   /**
@@ -638,7 +668,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     @Override
     public TimerInternals timerInternals() {
-      throw new UnsupportedOperationException("Not supported for regular DoFns.");
+      return timerInternals;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f86facb2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index d5594e6..b015f66 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
@@ -42,8 +41,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.joda.time.Instant;
 
 /**
  * Flink operator for executing window {@link DoFn DoFns}.
@@ -81,7 +78,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
     this.systemReduceFn = systemReduceFn;
 
-
   }
 
   @Override
@@ -114,11 +110,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   }
 
   @Override
-  protected ExecutionContext.StepContext createStepContext() {
-    return new WindowDoFnOperator.StepContext();
-  }
-
-  @Override
   public void fireTimer(InternalTimer<?, TimerData> timer) {
     pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
         KeyedWorkItems.<K, InputT>timersWorkItem(
@@ -126,43 +117,4 @@ public class WindowDoFnOperator<K, InputT, OutputT>
             Collections.singletonList(timer.getNamespace()))));
   }
 
-  @Override
-  public void processWatermark1(Watermark mark) throws Exception {
-    pushbackDoFnRunner.startBundle();
-
-    this.currentInputWatermark = mark.getTimestamp();
-
-    // hold back by the pushed back values waiting for side inputs
-    long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
-
-    timerService.advanceWatermark(actualInputWatermark);
-
-    Instant watermarkHold = stateInternals.watermarkHold();
-
-    long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
-
-    long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
-
-    if (potentialOutputWatermark > currentOutputWatermark) {
-      currentOutputWatermark = potentialOutputWatermark;
-      output.emitWatermark(new Watermark(currentOutputWatermark));
-    }
-    pushbackDoFnRunner.finishBundle();
-
-  }
-
-  /**
-   * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does now allow
-   * accessing state or timer internals.
-   */
-  protected class StepContext extends DoFnOperator.StepContext {
-
-    @Override
-    public TimerInternals timerInternals() {
-      return timerInternals;
-    }
-  }
-
-
-
 }


Mime
View raw message