beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [34/51] [abbrv] incubator-beam git commit: Move ExecutionContext and related classes to runners-core
Date Wed, 21 Dec 2016 22:50:08 GMT
Move ExecutionContext and related classes to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d2b8e09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d2b8e09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d2b8e09

Branch: refs/heads/python-sdk
Commit: 9d2b8e09bcb5e04017b487e1a919d335875dbfc0
Parents: 64336e4
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Dec 15 20:20:34 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Dec 21 10:10:00 2016 -0800

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            |   2 +-
 .../apex/translation/utils/NoOpStepContext.java |   3 +-
 .../beam/runners/core/AggregatorFactory.java    |   1 -
 .../beam/runners/core/BaseExecutionContext.java | 176 +++++++++++++++++++
 .../apache/beam/runners/core/DoFnRunners.java   |   2 +-
 .../beam/runners/core/ExecutionContext.java     | 102 +++++++++++
 .../beam/runners/core/SimpleDoFnRunner.java     |   2 +-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   2 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   2 +-
 .../runners/core/SimpleOldDoFnRunnerTest.java   |   3 +-
 .../runners/direct/AggregatorContainer.java     |   2 +-
 .../runners/direct/DirectExecutionContext.java  |   6 +-
 .../beam/runners/direct/EvaluationContext.java  |   2 +-
 .../runners/direct/AggregatorContainerTest.java |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 .../spark/aggregators/SparkAggregators.java     |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../beam/sdk/util/BaseExecutionContext.java     | 174 ------------------
 .../apache/beam/sdk/util/ExecutionContext.java  | 100 -----------
 20 files changed, 295 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index a3d3a97..c41cd45 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -41,6 +41,7 @@ import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.sdk.coders.Coder;
@@ -50,7 +51,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index 078f95f..f169ae6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -19,10 +19,9 @@ package org.apache.beam.runners.apex.translation.utils;
 
 import java.io.IOException;
 import java.io.Serializable;
-
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
index 153d30d..24a605f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
 
 /**
  * A factory for creating aggregators.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
new file mode 100644
index 0000000..7b674dc
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Base class for implementations of {@link ExecutionContext}.
+ *
+ * <p>A concrete subclass should implement {@link #createStepContext} to create the
appropriate
+ * {@link StepContext} implementation. Any {@code StepContext} created will
+ * be cached for the lifetime of this {@link ExecutionContext}.
+ *
+ * <p>BaseExecutionContext is generic to allow implementing subclasses to return a
concrete subclass
+ * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and
+ * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
+ * <pre>{@code
+ * {@literal @}Override
+ * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
+ *   return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
+ * }
+ * }</pre>
+ *
+ * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return
types of
+ * {@link #createStepContext(String, String)},
+ * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
+ * will be appropriately specialized.
+ */
+public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext>
+    implements ExecutionContext {
+
+  private Map<String, T> cachedStepContexts = new HashMap<>();
+
+  /**
+   * Implementations should override this to create the specific type
+   * of {@link StepContext} they need.
+   */
+  protected abstract T createStepContext(String stepName, String transformName);
+
+  /**
+   * Returns the {@link StepContext} associated with the given step.
+   */
+  @Override
+  public T getOrCreateStepContext(String stepName, String transformName) {
+    final String finalStepName = stepName;
+    final String finalTransformName = transformName;
+    return getOrCreateStepContext(
+        stepName,
+        new CreateStepContextFunction<T>() {
+          @Override
+          public T create() {
+            return createStepContext(finalStepName, finalTransformName);
+          }
+        });
+  }
+
+  /**
+   * Factory method interface to create an execution context if none exists during
+   * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
+   */
+  protected interface CreateStepContextFunction<T extends ExecutionContext.StepContext>
{
+    T create();
+  }
+
+  protected final T getOrCreateStepContext(String stepName,
+      CreateStepContextFunction<T> createContextFunc) {
+    T context = cachedStepContexts.get(stepName);
+    if (context == null) {
+      context = createContextFunc.create();
+      cachedStepContexts.put(stepName, context);
+    }
+
+    return context;
+  }
+
+  /**
+   * Returns a collection view of all of the {@link StepContext}s.
+   */
+  @Override
+  public Collection<? extends T> getAllStepContexts() {
+    return Collections.unmodifiableCollection(cachedStepContexts.values());
+  }
+
+  /**
+   * Hook for subclasses to implement that will be called whenever
+   * {@code DoFn.Context#output}
+   * is called.
+   */
+  @Override
+  public void noteOutput(WindowedValue<?> output) {}
+
+  /**
+   * Hook for subclasses to implement that will be called whenever
+   * {@code DoFn.Context#sideOutput}
+   * is called.
+   */
+  @Override
+  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
+
+  /**
+   * Base class for implementations of {@link ExecutionContext.StepContext}.
+   *
+   * <p>To complete a concrete subclass, implement {@link #timerInternals} and
+   * {@link #stateInternals}.
+   */
+  public abstract static class StepContext implements ExecutionContext.StepContext {
+    private final ExecutionContext executionContext;
+    private final String stepName;
+    private final String transformName;
+
+    public StepContext(ExecutionContext executionContext, String stepName, String transformName)
{
+      this.executionContext = executionContext;
+      this.stepName = stepName;
+      this.transformName = transformName;
+    }
+
+    @Override
+    public String getStepName() {
+      return stepName;
+    }
+
+    @Override
+    public String getTransformName() {
+      return transformName;
+    }
+
+    @Override
+    public void noteOutput(WindowedValue<?> output) {
+      executionContext.noteOutput(output);
+    }
+
+    @Override
+    public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+      executionContext.noteSideOutput(tag, output);
+    }
+
+    @Override
+    public <T, W extends BoundedWindow> void writePCollectionViewData(
+        TupleTag<?> tag,
+        Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>>
dataCoder,
+        W window, Coder<W> windowCoder) throws IOException {
+      throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public abstract StateInternals<?> stateInternals();
+
+    @Override
+    public abstract TimerInternals timerInternals();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 0e4bf75..820bfcd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -19,13 +19,13 @@ package org.apache.beam.runners.core;
 
 import java.util.List;
 import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
new file mode 100644
index 0000000..f67aff4
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Context for the current execution. This is guaranteed to exist during processing,
+ * but does not necessarily persist between different batches of work.
+ */
+public interface ExecutionContext {
+  /**
+   * Returns the {@link StepContext} associated with the given step.
+   */
+  StepContext getOrCreateStepContext(String stepName, String transformName);
+
+  /**
+   * Returns a collection view of all of the {@link StepContext}s.
+   */
+  Collection<? extends StepContext> getAllStepContexts();
+
+  /**
+   * Hook for subclasses to implement that will be called whenever
+   * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
+   * is called.
+   */
+  void noteOutput(WindowedValue<?> output);
+
+  /**
+   * Hook for subclasses to implement that will be called whenever
+   * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
+   * is called.
+   */
+  void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
+
+  /**
+   * Per-step, per-key context used for retrieving state.
+   */
+  public interface StepContext {
+
+    /**
+     * The name of the step.
+     */
+    String getStepName();
+
+    /**
+     * The name of the transform for the step.
+     */
+    String getTransformName();
+
+    /**
+     * Hook for subclasses to implement that will be called whenever
+     * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
+     * is called.
+     */
+    void noteOutput(WindowedValue<?> output);
+
+    /**
+     * Hook for subclasses to implement that will be called whenever
+     * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
+     * is called.
+     */
+    void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
+
+    /**
+     * Writes the given {@code PCollectionView} data to a globally accessible location.
+     */
+    <T, W extends BoundedWindow> void writePCollectionViewData(
+        TupleTag<?> tag,
+        Iterable<WindowedValue<T>> data,
+        Coder<Iterable<WindowedValue<T>>> dataCoder,
+        W window,
+        Coder<W> windowCoder)
+            throws IOException;
+
+    StateInternals<?> stateInternals();
+
+    TimerInternals timerInternals();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a7d82bf..b42c57d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimeDomain;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 342a4a8..1ff0212 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimeDomain;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index ec5d375..8ae09cb 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -26,13 +26,13 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
index 0e23dcb..4610069 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -22,9 +22,8 @@ import static org.mockito.Mockito.mock;
 
 import java.util.Arrays;
 import java.util.List;
-
+import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
index c7fa4df..fd17704 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
@@ -28,9 +28,9 @@ import java.util.concurrent.ConcurrentMap;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.util.ExecutionContext;
 
 /**
  * AccumT container for the current values associated with {@link Aggregator Aggregators}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
index c6051f0..8250cf1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.core.BaseExecutionContext;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.util.BaseExecutionContext;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.TimerInternals;
 
 /**
@@ -54,7 +54,7 @@ class DirectExecutionContext
    * Step Context for the {@link DirectRunner}.
    */
   public class DirectStepContext
-      extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext {
+      extends BaseExecutionContext.StepContext {
     private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
     private DirectTimerInternals timerInternals;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index bbcab8e..3b9367a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
index c8310c9..f770800 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
@@ -24,9 +24,9 @@ import static org.mockito.Mockito.when;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8704308..057a3e7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9cea529..9855d46 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -38,6 +38,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
index 17d5844..fa5c8d1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
@@ -22,11 +22,11 @@ import com.google.common.collect.ImmutableList;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 3a31cae..9957bf3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -24,11 +24,11 @@ import java.io.IOException;
 import java.util.Iterator;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
deleted file mode 100644
index e26f2b0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Base class for implementations of {@link ExecutionContext}.
- *
- * <p>A concrete subclass should implement {@link #createStepContext} to create the
appropriate
- * {@link StepContext} implementation. Any {@code StepContext} created will
- * be cached for the lifetime of this {@link ExecutionContext}.
- *
- * <p>BaseExecutionContext is generic to allow implementing subclasses to return a
concrete subclass
- * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and
- * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
- * <pre>{@code
- * {@literal @}Override
- * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) {
- *   return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...);
- * }
- * }</pre>
- *
- * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return
types of
- * {@link #createStepContext(String, String)},
- * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
- * will be appropriately specialized.
- */
-public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext>
-    implements ExecutionContext {
-
-  private Map<String, T> cachedStepContexts = new HashMap<>();
-
-  /**
-   * Implementations should override this to create the specific type
-   * of {@link StepContext} they need.
-   */
-  protected abstract T createStepContext(String stepName, String transformName);
-
-  /**
-   * Returns the {@link StepContext} associated with the given step.
-   */
-  @Override
-  public T getOrCreateStepContext(String stepName, String transformName) {
-    final String finalStepName = stepName;
-    final String finalTransformName = transformName;
-    return getOrCreateStepContext(
-        stepName,
-        new CreateStepContextFunction<T>() {
-          @Override
-          public T create() {
-            return createStepContext(finalStepName, finalTransformName);
-          }
-        });
-  }
-
-  /**
-   * Factory method interface to create an execution context if none exists during
-   * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}.
-   */
-  protected interface CreateStepContextFunction<T extends ExecutionContext.StepContext>
{
-    T create();
-  }
-
-  protected final T getOrCreateStepContext(String stepName,
-      CreateStepContextFunction<T> createContextFunc) {
-    T context = cachedStepContexts.get(stepName);
-    if (context == null) {
-      context = createContextFunc.create();
-      cachedStepContexts.put(stepName, context);
-    }
-
-    return context;
-  }
-
-  /**
-   * Returns a collection view of all of the {@link StepContext}s.
-   */
-  @Override
-  public Collection<? extends T> getAllStepContexts() {
-    return Collections.unmodifiableCollection(cachedStepContexts.values());
-  }
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@code DoFn.Context#output}
-   * is called.
-   */
-  @Override
-  public void noteOutput(WindowedValue<?> output) {}
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@code DoFn.Context#sideOutput}
-   * is called.
-   */
-  @Override
-  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
-
-  /**
-   * Base class for implementations of {@link ExecutionContext.StepContext}.
-   *
-   * <p>To complete a concrete subclass, implement {@link #timerInternals} and
-   * {@link #stateInternals}.
-   */
-  public abstract static class StepContext implements ExecutionContext.StepContext {
-    private final ExecutionContext executionContext;
-    private final String stepName;
-    private final String transformName;
-
-    public StepContext(ExecutionContext executionContext, String stepName, String transformName)
{
-      this.executionContext = executionContext;
-      this.stepName = stepName;
-      this.transformName = transformName;
-    }
-
-    @Override
-    public String getStepName() {
-      return stepName;
-    }
-
-    @Override
-    public String getTransformName() {
-      return transformName;
-    }
-
-    @Override
-    public void noteOutput(WindowedValue<?> output) {
-      executionContext.noteOutput(output);
-    }
-
-    @Override
-    public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
-      executionContext.noteSideOutput(tag, output);
-    }
-
-    @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>>
dataCoder,
-        W window, Coder<W> windowCoder) throws IOException {
-      throw new UnsupportedOperationException("Not implemented.");
-    }
-
-    @Override
-    public abstract StateInternals<?> stateInternals();
-
-    @Override
-    public abstract TimerInternals timerInternals();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
deleted file mode 100644
index 4429d76..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.io.IOException;
-import java.util.Collection;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Context for the current execution. This is guaranteed to exist during processing,
- * but does not necessarily persist between different batches of work.
- */
-public interface ExecutionContext {
-  /**
-   * Returns the {@link StepContext} associated with the given step.
-   */
-  StepContext getOrCreateStepContext(String stepName, String transformName);
-
-  /**
-   * Returns a collection view of all of the {@link StepContext}s.
-   */
-  Collection<? extends StepContext> getAllStepContexts();
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
-   * is called.
-   */
-  void noteOutput(WindowedValue<?> output);
-
-  /**
-   * Hook for subclasses to implement that will be called whenever
-   * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
-   * is called.
-   */
-  void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
-
-  /**
-   * Per-step, per-key context used for retrieving state.
-   */
-  public interface StepContext {
-
-    /**
-     * The name of the step.
-     */
-    String getStepName();
-
-    /**
-     * The name of the transform for the step.
-     */
-    String getTransformName();
-
-    /**
-     * Hook for subclasses to implement that will be called whenever
-     * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
-     * is called.
-     */
-    void noteOutput(WindowedValue<?> output);
-
-    /**
-     * Hook for subclasses to implement that will be called whenever
-     * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
-     * is called.
-     */
-    void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
-
-    /**
-     * Writes the given {@code PCollectionView} data to a globally accessible location.
-     */
-    <T, W extends BoundedWindow> void writePCollectionViewData(
-        TupleTag<?> tag,
-        Iterable<WindowedValue<T>> data,
-        Coder<Iterable<WindowedValue<T>>> dataCoder,
-        W window,
-        Coder<W> windowCoder)
-            throws IOException;
-
-    StateInternals<?> stateInternals();
-
-    TimerInternals timerInternals();
-  }
-}



Mime
View raw message