beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [08/16] beam git commit: Implement StepContext directly in the DirectRunner
Date Tue, 23 May 2017 18:18:07 GMT
Implement StepContext directly in the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5ac24e0a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5ac24e0a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5ac24e0a

Branch: refs/heads/master
Commit: 5ac24e0a89b95feafccbe381bdde9c11fdf82a88
Parents: 248c808
Author: Kenneth Knowles <klk@google.com>
Authored: Mon May 22 15:44:17 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue May 23 11:16:27 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/DirectExecutionContext.java  | 33 +++++++++++++++++---
 1 file changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5ac24e0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index d676f24..2a75ef5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,13 +17,18 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.io.IOException;
 import org.apache.beam.runners.core.BaseExecutionContext;
-import org.apache.beam.runners.core.BaseStepContext;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Execution Context for the {@link DirectRunner}.
@@ -57,14 +62,16 @@ class DirectExecutionContext
   /**
    * Step Context for the {@link DirectRunner}.
    */
-  public class DirectStepContext
-      extends BaseStepContext {
+  public class DirectStepContext implements StepContext {
     private CopyOnAccessInMemoryStateInternals<?> stateInternals;
     private DirectTimerInternals timerInternals;
+    private final String stepName;
+    private final String transformName;
 
     public DirectStepContext(
         ExecutionContext executionContext, String stepName, String transformName) {
-      super(stepName, transformName);
+      this.stepName = stepName;
+      this.transformName = transformName;
     }
 
     @Override
@@ -95,6 +102,24 @@ class DirectExecutionContext
       return null;
     }
 
+    @Override
+    public String getStepName() {
+      return stepName;
+    }
+
+    @Override
+    public String getTransformName() {
+      return transformName;
+    }
+
+    @Override
+    public <T, W extends BoundedWindow> void writePCollectionViewData(
+        TupleTag<?> tag,
+        Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>>
dataCoder,
+        W window, Coder<W> windowCoder) throws IOException {
+      throw new UnsupportedOperationException("Not implemented.");
+    }
+
     /**
      * Gets the timer update of the {@link TimerInternals} of this {@link DirectStepContext},
      * which is empty if the {@link TimerInternals} were never accessed.


Mime
View raw message