beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Remove DirectRunner implementations from DoFnTester
Date Wed, 08 Jun 2016 02:56:20 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 9a8cb95db -> ffbfc66e1


Remove DirectRunner implementations from DoFnTester

Part of the removal of runners-core from the Core SDK, and the deletion
of the DirectRunner from the core SDK.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f2b3669
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f2b3669
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f2b3669

Branch: refs/heads/master
Commit: 7f2b3669af714f732d6433ba6f94fa0d9855bfaa
Parents: 9a8cb95
Author: Thomas Groh <tgroh@google.com>
Authored: Tue May 31 17:09:19 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Jun 7 19:41:18 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/examples/WordCountTest.java |   2 +-
 .../cookbook/CombinePerKeyExamplesTest.java     |   4 +-
 .../examples/cookbook/FilterExamplesTest.java   |   4 +-
 .../examples/cookbook/JoinExamplesTest.java     |   4 +-
 .../cookbook/MaxPerKeyExamplesTest.java         |   4 +-
 .../examples/cookbook/TriggerExampleTest.java   |   2 +-
 .../examples/complete/game/UserScoreTest.java   |   2 +-
 .../dataflow/DataflowPipelineRunnerTest.java    |  31 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  | 281 +++++++++++++++----
 .../beam/sdk/transforms/DoFnTesterTest.java     |  16 +-
 .../sdk/transforms/join/CoGroupByKeyTest.java   |   2 +-
 .../src/test/java/WordCountTest.java            |   2 +-
 12 files changed, 264 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
index c114d6e..1382612 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
@@ -48,7 +48,7 @@ public class WordCountTest {
 
   /** Example test that tests a specific DoFn. */
   @Test
-  public void testExtractWordsFn() {
+  public void testExtractWordsFn() throws Exception {
     DoFnTester<String, String> extractWordsFn =
         DoFnTester.of(new ExtractWordsFn());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java
b/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java
index bb475c4..5d6456a 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java
@@ -72,7 +72,7 @@ public class CombinePerKeyExamplesTest {
       .set("all_plays", "king_lear,macbeth");
 
   @Test
-  public void testExtractLargeWordsFn() {
+  public void testExtractLargeWordsFn() throws Exception {
     DoFnTester<TableRow, KV<String, String>> extractLargeWordsFn =
         DoFnTester.of(new ExtractLargeWordsFn());
     List<KV<String, String>> results = extractLargeWordsFn.processBatch(ROWS_ARRAY);
@@ -82,7 +82,7 @@ public class CombinePerKeyExamplesTest {
   }
 
   @Test
-  public void testFormatShakespeareOutputFn() {
+  public void testFormatShakespeareOutputFn() throws Exception {
     DoFnTester<KV<String, String>, TableRow> formatShakespeareOutputFn =
         DoFnTester.of(new FormatShakespeareOutputFn());
     List<TableRow> results = formatShakespeareOutputFn.processBatch(COMBINED_TUPLES_ARRAY);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java
b/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java
index 26393df..13beab0 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java
@@ -68,7 +68,7 @@ public class FilterExamplesTest {
 
 
   @Test
-  public void testProjectionFn() {
+  public void testProjectionFn() throws Exception {
     DoFnTester<TableRow, TableRow> projectionFn =
         DoFnTester.of(new ProjectionFn());
     List<TableRow> results = projectionFn.processBatch(ROWS_ARRAY);
@@ -78,7 +78,7 @@ public class FilterExamplesTest {
   }
 
   @Test
-  public void testFilterSingleMonthDataFn() {
+  public void testFilterSingleMonthDataFn() throws Exception {
     DoFnTester<TableRow, TableRow> filterSingleMonthDataFn =
         DoFnTester.of(new FilterSingleMonthDataFn(7));
     List<TableRow> results = filterSingleMonthDataFn.processBatch(PROJROWS_ARRAY);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
b/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
index c7827d1..22fe6a1 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/JoinExamplesTest.java
@@ -84,7 +84,7 @@ public class JoinExamplesTest {
     };
 
   @Test
-  public void testExtractEventDataFn() {
+  public void testExtractEventDataFn() throws Exception {
     DoFnTester<TableRow, KV<String, String>> extractEventDataFn =
         DoFnTester.of(new ExtractEventDataFn());
     List<KV<String, String>> results = extractEventDataFn.processBatch(EVENTS);
@@ -93,7 +93,7 @@ public class JoinExamplesTest {
   }
 
   @Test
-  public void testExtractCountryInfoFn() {
+  public void testExtractCountryInfoFn() throws Exception {
     DoFnTester<TableRow, KV<String, String>> extractCountryInfoFn =
         DoFnTester.of(new ExtractCountryInfoFn());
     List<KV<String, String>> results = extractCountryInfoFn.processBatch(CCS);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java
b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java
index 0a84621..9e129a1 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/MaxPerKeyExamplesTest.java
@@ -66,7 +66,7 @@ public class MaxPerKeyExamplesTest {
 
 
   @Test
-  public void testExtractTempFn() {
+  public void testExtractTempFn() throws Exception {
     DoFnTester<TableRow, KV<Integer, Double>> extractTempFn =
         DoFnTester.of(new ExtractTempFn());
     List<KV<Integer, Double>> results = extractTempFn.processBatch(TEST_ROWS);
@@ -76,7 +76,7 @@ public class MaxPerKeyExamplesTest {
   }
 
   @Test
-  public void testFormatMaxesFn() {
+  public void testFormatMaxesFn() throws Exception {
     DoFnTester<KV<Integer, Double>, TableRow> formatMaxesFnFn =
         DoFnTester.of(new FormatMaxesFn());
     List<TableRow> results = formatMaxesFnFn.processBatch(TEST_KVS);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index 4f15aef..fe75d14 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -87,7 +87,7 @@ public class TriggerExampleTest {
       .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)");
 
   @Test
-  public void testExtractTotalFlow() {
+  public void testExtractTotalFlow() throws Exception {
     DoFnTester<String, KV<String, Integer>> extractFlowInfow = DoFnTester
         .of(new ExtractFlowInfo());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
index 40740a6..842ea30 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -85,7 +85,7 @@ public class UserScoreTest implements Serializable {
 
   /** Test the ParseEventFn DoFn. */
   @Test
-  public void testParseEventFn() {
+  public void testParseEventFn() throws Exception {
     DoFnTester<String, GameActionInfo> parseEventFn =
         DoFnTester.of(new ParseEventFn());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
index 66c2feb..aa65dd1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -76,7 +75,6 @@ import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -976,18 +974,12 @@ public class DataflowPipelineRunnerTest {
         new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn
         <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
 
-    try {
-      doFnTester.processBatch(
-          ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(
-              0, ImmutableList.of(
-                  KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
-                  KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
-      fail("Expected UserCodeException");
-    } catch (UserCodeException e) {
-      assertTrue(e.getCause() instanceof IllegalStateException);
-      IllegalStateException rootCause = (IllegalStateException) e.getCause();
-      assertThat(rootCause.getMessage(), containsString("found for singleton within window"));
-    }
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("found for singleton within window");
+    doFnTester.processBatch(ImmutableList.of(
+        KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(0,
+            ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
+                KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
   }
 
   @Test
@@ -1192,14 +1184,9 @@ public class DataflowPipelineRunnerTest {
                 KV.of(KV.of(1L, windowA),
                     WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)))));
 
-    try {
-      doFnTester.processBatch(inputElements);
-      fail("Expected UserCodeException");
-    } catch (UserCodeException e) {
-      assertTrue(e.getCause() instanceof IllegalStateException);
-      IllegalStateException rootCause = (IllegalStateException) e.getCause();
-      assertThat(rootCause.getMessage(), containsString("Unique keys are expected but found
key"));
-    }
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unique keys are expected but found key");
+    doFnTester.processBatch(inputElements);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 16dc731..332ea13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -20,22 +20,19 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.DirectModeExecutionContext;
-import org.apache.beam.sdk.util.DirectSideInputReader;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunnerBase;
-import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PTuple;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
 import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -44,6 +41,7 @@ import org.joda.time.Instant;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -165,7 +163,7 @@ public class DoFnTester<InputT, OutputT> {
    * calls {@link #finishBundle}, then returns the result of
    * {@link #takeOutputElements}.
    */
-  public List<OutputT> processBatch(Iterable <? extends InputT> inputElements)
{
+  public List<OutputT> processBatch(Iterable <? extends InputT> inputElements)
throws Exception {
     startBundle();
     for (InputT inputElement : inputElements) {
       processElement(inputElement);
@@ -186,7 +184,7 @@ public class DoFnTester<InputT, OutputT> {
    * </ol>
    */
   @SafeVarargs
-  public final List<OutputT> processBatch(InputT... inputElements) {
+  public final List<OutputT> processBatch(InputT... inputElements) throws Exception
{
     return processBatch(Arrays.asList(inputElements));
   }
 
@@ -195,10 +193,12 @@ public class DoFnTester<InputT, OutputT> {
    *
    * <p>If needed, first creates a fresh instance of the DoFn under test.
    */
-  public void startBundle() {
+  public void startBundle() throws Exception {
     resetState();
     initializeState();
-    fnRunner.startBundle();
+    TestContext<InputT, OutputT> context = createContext(fn);
+    context.setupDelegateAggregators();
+    fn.startBundle(context);
     state = State.STARTED;
   }
 
@@ -213,14 +213,14 @@ public class DoFnTester<InputT, OutputT> {
    * @throws IllegalStateException if the {@code DoFn} under test has already
    * been finished
    */
-  public void processElement(InputT element) {
+  public void processElement(InputT element) throws Exception {
     if (state == State.FINISHED) {
       throw new IllegalStateException("finishBundle() has already been called");
     }
     if (state == State.UNSTARTED) {
       startBundle();
     }
-    fnRunner.processElement(WindowedValue.valueInGlobalWindow(element));
+    fn.processElement(createProcessContext(fn, element));
   }
 
   /**
@@ -232,14 +232,14 @@ public class DoFnTester<InputT, OutputT> {
    * @throws IllegalStateException if the {@code DoFn} under test has already
    * been finished
    */
-  public void finishBundle() {
+  public void finishBundle() throws Exception {
     if (state == State.FINISHED) {
       throw new IllegalStateException("finishBundle() has already been called");
     }
     if (state == State.UNSTARTED) {
       startBundle();
     }
-    fnRunner.finishBundle();
+    fn.finishBundle(createContext(fn));
     state = State.FINISHED;
   }
 
@@ -276,8 +276,7 @@ public class DoFnTester<InputT, OutputT> {
   @Experimental
   public List<OutputElementWithTimestamp<OutputT>> peekOutputElementsWithTimestamp()
{
     // TODO: Should we return an unmodifiable list?
-    return Lists.transform(
-        outputManager.getOutput(mainOutputTag),
+    return Lists.transform(getOutput(mainOutputTag),
         new Function<Object, OutputElementWithTimestamp<OutputT>>() {
           @Override
           @SuppressWarnings("unchecked")
@@ -336,8 +335,7 @@ public class DoFnTester<InputT, OutputT> {
    */
   public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
     // TODO: Should we return an unmodifiable list?
-    return Lists.transform(
-        outputManager.getOutput(tag),
+    return Lists.transform(getOutput(tag),
         new Function<WindowedValue<T>, T>() {
           @SuppressWarnings("unchecked")
           @Override
@@ -372,11 +370,17 @@ public class DoFnTester<InputT, OutputT> {
    * Returns the value of the provided {@link Aggregator}.
    */
   public <AggregateT> AggregateT getAggregatorValue(Aggregator<?, AggregateT>
agg) {
+    return extractAggregatorValue(agg.getName(), agg.getCombineFn());
+  }
+
+  private <AccumT, AggregateT> AggregateT extractAggregatorValue(
+      String name, CombineFn<?, AccumT, AggregateT> combiner) {
     @SuppressWarnings("unchecked")
-    Counter<AggregateT> counter =
-        (Counter<AggregateT>)
-            counterSet.getExistingCounter("user-" + STEP_NAME + "-" + agg.getName());
-    return counter.getAggregate();
+    AccumT accumulator = (AccumT) accumulators.get(name);
+    if (accumulator == null) {
+      accumulator = combiner.createAccumulator();
+    }
+    return combiner.extractOutput(accumulator);
   }
 
   /**
@@ -415,6 +419,205 @@ public class DoFnTester<InputT, OutputT> {
     }
   }
 
+  private <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    List<WindowedValue<T>> elems = (List) outputs.get(tag);
+    return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList());
+  }
+
+  private TestContext<InputT, OutputT> createContext(DoFn<InputT, OutputT> fn)
{
+    return new TestContext<>(fn, options, mainOutputTag, outputs, accumulators);
+  }
+
+  private static class TestContext<InT, OutT> extends DoFn<InT, OutT>.Context
{
+    private final PipelineOptions opts;
+    private final TupleTag<OutT> mainOutputTag;
+    private final Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
+    private final Map<String, Object> accumulators;
+
+    public TestContext(
+        DoFn<InT, OutT> fn,
+        PipelineOptions opts,
+        TupleTag<OutT> mainOutputTag,
+        Map<TupleTag<?>, List<WindowedValue<?>>> outputs,
+        Map<String, Object> accumulators) {
+      fn.super();
+      this.opts = opts;
+      this.mainOutputTag = mainOutputTag;
+      this.outputs = outputs;
+      this.accumulators = accumulators;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return opts;
+    }
+
+    @Override
+    public void output(OutT output) {
+      sideOutput(mainOutputTag, output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutT output, Instant timestamp) {
+      sideOutputWithTimestamp(mainOutputTag, output, timestamp);
+    }
+
+    @Override
+    protected <AggInT, AggOutT> Aggregator<AggInT, AggOutT> createAggregatorInternal(
+        final String name, final CombineFn<AggInT, ?, AggOutT> combiner) {
+      return aggregator(name, combiner);
+    }
+
+    private <AinT, AccT, AoutT> Aggregator<AinT, AoutT> aggregator(
+        final String name,
+        final CombineFn<AinT, AccT, AoutT> combiner) {
+      Aggregator<AinT, AoutT> aggregator = new Aggregator<AinT, AoutT>() {
+        @Override
+        public void addValue(AinT value) {
+          AccT accum = (AccT) accumulators.get(name);
+          AccT newAccum = combiner.addInput(accum, value);
+          accumulators.put(name, newAccum);
+        }
+
+        @Override
+        public String getName() {
+          return name;
+        }
+
+        @Override
+        public CombineFn<AinT, ?, AoutT> getCombineFn() {
+          return combiner;
+        }
+      };
+      accumulators.put(name, combiner.createAccumulator());
+      return aggregator;
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
+
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      sideOutputWithTimestamp(tag, output, BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    public <T> void noteOutput(TupleTag<T> tag, WindowedValue<T> output)
{
+      getOutputList(tag).add(output);
+    }
+
+    private <T> List<WindowedValue<T>> getOutputList(TupleTag<T>
tag) {
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      List<WindowedValue<T>> outputList = (List) outputs.get(tag);
+      if (outputList == null) {
+        outputList = new ArrayList<>();
+        outputs.put(tag, (List) outputList);
+      }
+      return outputList;
+    }
+  }
+
+  private TestProcessContext<InputT, OutputT> createProcessContext(
+      DoFn<InputT, OutputT> fn,
+      InputT elem) {
+    return new TestProcessContext<>(fn,
+        createContext(fn),
+        WindowedValue.valueInGlobalWindow(elem),
+        mainOutputTag,
+        sideInputs);
+  }
+
+  private static class TestProcessContext<InT, OutT> extends DoFn<InT, OutT>.ProcessContext
{
+    private final TestContext<InT, OutT> context;
+    private final TupleTag<OutT> mainOutputTag;
+    private final WindowedValue<InT> element;
+    private final Map<PCollectionView<?>, ?> sideInputs;
+
+    private TestProcessContext(
+        DoFn<InT, OutT> fn,
+        TestContext<InT, OutT> context,
+        WindowedValue<InT> element,
+        TupleTag<OutT> mainOutputTag,
+        Map<PCollectionView<?>, ?> sideInputs) {
+      fn.super();
+      this.context = context;
+      this.element = element;
+      this.mainOutputTag = mainOutputTag;
+      this.sideInputs = sideInputs;
+    }
+
+    @Override
+    public InT element() {
+      return element.getValue();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      @SuppressWarnings("unchecked")
+      T sideInput = (T) sideInputs.get(view);
+      return sideInput;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return element.getTimestamp();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(element.getWindows());
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return element.getPane();
+    }
+
+    @Override
+    public WindowingInternals<InT, OutT> windowingInternals() {
+      throw new UnsupportedOperationException(
+          "WindowingInternals is an internal implementation detail of the Beam SDK, "
+              + "and should not be used by user code");
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public void output(OutT output) {
+      sideOutput(mainOutputTag, output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutT output, Instant timestamp) {
+      sideOutputWithTimestamp(mainOutputTag, output, timestamp);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      sideOutputWithTimestamp(tag, output, element.getTimestamp());
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
+      context.noteOutput(tag,
+          WindowedValue.of(output, timestamp, element.getWindows(), element.getPane()));
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      throw new IllegalStateException("Aggregators should not be created within ProcessContext.
"
+          + "Instead, create an aggregator at DoFn construction time with createAggregator,
and "
+          + "ensure they are set up by the time startBundle is called "
+          + "with setupDelegateAggregators.");
+    }
+  }
+
   /////////////////////////////////////////////////////////////////////////////
 
   /** The possible states of processing a DoFn. */
@@ -438,6 +641,8 @@ public class DoFnTester<InputT, OutputT> {
   private Map<PCollectionView<?>, Iterable<WindowedValue<?>>> sideInputs
=
       new HashMap<>();
 
+  private Map<String, Object> accumulators;
+
   /** The output tags used by the DoFn under test. */
   TupleTag<OutputT> mainOutputTag = new TupleTag<>();
   List<TupleTag<?>> sideOutputTags = new ArrayList<>();
@@ -446,13 +651,7 @@ public class DoFnTester<InputT, OutputT> {
   DoFn<InputT, OutputT> fn;
 
   /** The ListOutputManager to examine the outputs. */
-  DoFnRunnerBase.ListOutputManager outputManager;
-
-  /** The DoFnRunner if processing is in progress. */
-  DoFnRunner<InputT, OutputT> fnRunner;
-
-  /** Counters for user-defined Aggregators if processing is in progress. */
-  CounterSet counterSet;
+  Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
 
   /** The state of processing of the DoFn under test. */
   State state;
@@ -464,9 +663,8 @@ public class DoFnTester<InputT, OutputT> {
 
   void resetState() {
     fn = null;
-    outputManager = null;
-    fnRunner = null;
-    counterSet = null;
+    outputs = null;
+    accumulators = null;
     state = State.UNSTARTED;
   }
 
@@ -476,23 +674,12 @@ public class DoFnTester<InputT, OutputT> {
         SerializableUtils.deserializeFromByteArray(
             SerializableUtils.serializeToByteArray(origFn),
             origFn.toString());
-    counterSet = new CounterSet();
     PTuple runnerSideInputs = PTuple.empty();
     for (Map.Entry<PCollectionView<?>, Iterable<WindowedValue<?>>>
entry
         : sideInputs.entrySet()) {
       runnerSideInputs = runnerSideInputs.and(entry.getKey().getTagInternal(), entry.getValue());
     }
-    outputManager = new DoFnRunnerBase.ListOutputManager();
-    fnRunner =
-        DoFnRunners.createDefault(
-            options,
-            fn,
-            DirectSideInputReader.of(runnerSideInputs),
-            outputManager,
-            mainOutputTag,
-            sideOutputTags,
-            DirectModeExecutionContext.create().getOrCreateStepContext(STEP_NAME, TRANSFORM_NAME),
-            counterSet.getAddCounterMutator(),
-            WindowingStrategy.globalDefault());
+    outputs = new HashMap<>();
+    accumulators = new HashMap<>();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 4df5be7..ec22251 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -39,7 +39,7 @@ import java.util.List;
 public class DoFnTesterTest {
 
   @Test
-  public void processElement() {
+  public void processElement() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
     DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
 
@@ -61,7 +61,7 @@ public class DoFnTesterTest {
   }
 
   @Test
-  public void processElementsWithPeeks() {
+  public void processElementsWithPeeks() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
     DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
 
@@ -119,7 +119,7 @@ public class DoFnTesterTest {
   }
 
   @Test
-  public void processBatch() {
+  public void processBatch() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
     DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
 
@@ -138,7 +138,7 @@ public class DoFnTesterTest {
   }
 
   @Test
-  public void processElementWithTimestamp() {
+  public void processElementWithTimestamp() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
     DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
 
@@ -175,7 +175,7 @@ public class DoFnTesterTest {
   }
 
   @Test
-  public void getAggregatorValuesShouldGetValueOfCounter() {
+  public void getAggregatorValuesShouldGetValueOfCounter() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
     DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
     tester.processBatch(1L, 2L, 4L, 8L);
@@ -186,7 +186,7 @@ public class DoFnTesterTest {
   }
 
   @Test
-  public void getAggregatorValuesWithEmptyCounterShouldSucceed() {
+  public void getAggregatorValuesWithEmptyCounterShouldSucceed() throws Exception {
     CounterDoFn counterDoFn = new CounterDoFn();
     DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn);
     tester.processBatch();
@@ -196,7 +196,7 @@ public class DoFnTesterTest {
   }
 
   @Test
-  public void getAggregatorValuesInStartFinishBundleShouldGetValues() {
+  public void getAggregatorValuesInStartFinishBundleShouldGetValues() throws Exception {
     CounterDoFn fn = new CounterDoFn(1L, 2L);
     DoFnTester<Long, String> tester = DoFnTester.of(fn);
     tester.processBatch(0L, 0L);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index f8e7f08..d99c536 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -406,7 +406,7 @@ public class CoGroupByKeyTest implements Serializable {
    */
   @SuppressWarnings("unchecked")
   @Test
-  public void testConsumingDoFn() {
+  public void testConsumingDoFn() throws Exception {
     TupleTag<String> purchasesTag = new TupleTag<>();
     TupleTag<String> addressesTag = new TupleTag<>();
     TupleTag<String> namesTag = new TupleTag<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f2b3669/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
index b921875..4b26198 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java
@@ -48,7 +48,7 @@ public class WordCountTest {
 
   /** Example test that tests a specific DoFn. */
   @Test
-  public void testExtractWordsFn() {
+  public void testExtractWordsFn() throws Exception {
     DoFnTester<String, String> extractWordsFn =
         DoFnTester.of(new ExtractWordsFn());
 


Mime
View raw message