beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [02/17] incubator-beam git commit: Move InProcessRunner to its own module
Date Fri, 29 Apr 2016 21:56:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
deleted file mode 100644
index d1ea51a..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests for {@link InProcessEvaluationContext}.
- */
-@RunWith(JUnit4.class)
-public class InProcessEvaluationContextTest {
-  private TestPipeline p;
-  private InProcessEvaluationContext context;
-
-  private PCollection<Integer> created;
-  private PCollection<KV<String, Integer>> downstream;
-  private PCollectionView<Iterable<Integer>> view;
-  private PCollection<Long> unbounded;
-  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
-  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
-
-  private BundleFactory bundleFactory;
-
-  @Before
-  public void setup() {
-    InProcessPipelineRunner runner =
-        InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());
-
-    p = TestPipeline.create();
-
-    created = p.apply(Create.of(1, 2, 3));
-    downstream = created.apply(WithKeys.<String, Integer>of("foo"));
-    view = created.apply(View.<Integer>asIterable());
-    unbounded = p.apply(CountingInput.unbounded());
-
-    ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor();
-    p.traverseTopologically(cVis);
-    rootTransforms = cVis.getRootTransforms();
-    valueToConsumers = cVis.getValueToConsumers();
-
-    bundleFactory = InProcessBundleFactory.create();
-
-    context =
-        InProcessEvaluationContext.create(
-            runner.getPipelineOptions(),
-            InProcessBundleFactory.create(),
-            rootTransforms,
-            valueToConsumers,
-            cVis.getStepNames(),
-            cVis.getViews());
-  }
-
-  @Test
-  public void writeToViewWriterThenReadReads() {
-    PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
-        context.createPCollectionViewWriter(
-            PCollection.<Iterable<Integer>>createPrimitiveOutputInternal(
-                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED),
-            view);
-    BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
-    BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
-    WindowedValue<Integer> firstValue =
-        WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Integer> secondValue =
-        WindowedValue.of(
-            2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0));
-    Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue);
-    viewWriter.add(values);
-
-    SideInputReader reader =
-        context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
-    assertThat(reader.get(view, window), containsInAnyOrder(1));
-    assertThat(reader.get(view, second), containsInAnyOrder(2));
-
-    WindowedValue<Integer> overrittenSecondValue =
-        WindowedValue.of(
-            4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
-    viewWriter.add(Collections.singleton(overrittenSecondValue));
-    assertThat(reader.get(view, second), containsInAnyOrder(4444));
-  }
-
-  @Test
-  public void getExecutionContextSameStepSameKeyState() {
-    InProcessExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
-
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
-
-    InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
-    stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
-
-    context.handleResult(
-        InProcessBundleFactory.create()
-            .createKeyedBundle(null, "foo", created)
-            .commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(created.getProducingTransformInternal())
-            .withState(stepContext.commitState())
-            .build());
-
-    InProcessExecutionContext secondFooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
-    assertThat(
-        secondFooContext
-            .getOrCreateStepContext("s1", "s1")
-            .stateInternals()
-            .state(StateNamespaces.global(), intBag)
-            .read(),
-        contains(1));
-  }
-
-
-  @Test
-  public void getExecutionContextDifferentKeysIndependentState() {
-    InProcessExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
-
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
-
-    fooContext
-        .getOrCreateStepContext("s1", "s1")
-        .stateInternals()
-        .state(StateNamespaces.global(), intBag)
-        .add(1);
-
-    InProcessExecutionContext barContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), "bar");
-    assertThat(barContext, not(equalTo(fooContext)));
-    assertThat(
-        barContext
-            .getOrCreateStepContext("s1", "s1")
-            .stateInternals()
-            .state(StateNamespaces.global(), intBag)
-            .read(),
-        emptyIterable());
-  }
-
-  @Test
-  public void getExecutionContextDifferentStepsIndependentState() {
-    String myKey = "foo";
-    InProcessExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), myKey);
-
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
-
-    fooContext
-        .getOrCreateStepContext("s1", "s1")
-        .stateInternals()
-        .state(StateNamespaces.global(), intBag)
-        .add(1);
-
-    InProcessExecutionContext barContext =
-        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
-    assertThat(
-        barContext
-            .getOrCreateStepContext("s1", "s1")
-            .stateInternals()
-            .state(StateNamespaces.global(), intBag)
-            .read(),
-        emptyIterable());
-  }
-
-  @Test
-  public void handleResultMergesCounters() {
-    CounterSet counters = context.createCounterSet();
-    Counter<Long> myCounter = Counter.longs("foo", AggregationKind.SUM);
-    counters.addCounter(myCounter);
-
-    myCounter.addValue(4L);
-    InProcessTransformResult result =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal())
-            .withCounters(counters)
-            .build();
-    context.handleResult(null, ImmutableList.<TimerData>of(), result);
-    assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L));
-
-    CounterSet againCounters = context.createCounterSet();
-    Counter<Long> myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM);
-    againCounters.add(myLongCounterAgain);
-    myLongCounterAgain.addValue(8L);
-
-    InProcessTransformResult secondResult =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
-            .withCounters(againCounters)
-            .build();
-    context.handleResult(
-        context.createRootBundle(created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        secondResult);
-    assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L));
-  }
-
-  @Test
-  public void handleResultStoresState() {
-    String myKey = "foo";
-    InProcessExecutionContext fooContext =
-        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
-
-    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
-
-    CopyOnAccessInMemoryStateInternals<Object> state =
-        fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
-    BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
-    bag.add(1);
-    bag.add(2);
-    bag.add(4);
-
-    InProcessTransformResult stateResult =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
-            .withState(state)
-            .build();
-
-    context.handleResult(
-        context.createKeyedBundle(null, myKey, created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        stateResult);
-
-    InProcessExecutionContext afterResultContext =
-        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
-
-    CopyOnAccessInMemoryStateInternals<Object> afterResultState =
-        afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
-    assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
-  }
-
-  @Test
-  public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception {
-    final CountDownLatch callLatch = new CountDownLatch(1);
-    Runnable callback =
-        new Runnable() {
-          @Override
-          public void run() {
-            callLatch.countDown();
-          }
-        };
-
-    // Should call back after the end of the global window
-    context.scheduleAfterOutputWouldBeProduced(
-        downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
-
-    InProcessTransformResult result =
-        StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
-            .build();
-
-    context.handleResult(null, ImmutableList.<TimerData>of(), result);
-
-    // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit
-    // will likely be flaky if this logic is broken
-    assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
-
-    InProcessTransformResult finishedResult =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
-    context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
-    // Obtain the value via blocking call
-    assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
-  }
-
-  @Test
-  public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception {
-    InProcessTransformResult finishedResult =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
-    context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
-
-    final CountDownLatch callLatch = new CountDownLatch(1);
-    Runnable callback =
-        new Runnable() {
-          @Override
-          public void run() {
-            callLatch.countDown();
-          }
-        };
-    context.scheduleAfterOutputWouldBeProduced(
-        downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
-    assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
-  }
-
-  @Test
-  public void extractFiredTimersExtractsTimers() {
-    InProcessTransformResult holdResult =
-        StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
-            .build();
-    context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
-
-    String key = "foo";
-    TimerData toFire =
-        TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
-    InProcessTransformResult timerResult =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
-            .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
-            .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build())
-            .build();
-
-    // haven't added any timers, must be empty
-    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
-    context.handleResult(
-        context.createKeyedBundle(null, key, created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        timerResult);
-
-    // timer hasn't fired
-    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
-
-    InProcessTransformResult advanceResult =
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
-    // Should cause the downstream timer to fire
-    context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
-
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = context.extractFiredTimers();
-    assertThat(
-        fired,
-        Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal()));
-    Map<Object, FiredTimers> downstreamFired =
-        fired.get(downstream.getProducingTransformInternal());
-    assertThat(downstreamFired, Matchers.<Object>hasKey(key));
-
-    FiredTimers firedForKey = downstreamFired.get(key);
-    assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable());
-    assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable());
-    assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire));
-
-    // Don't reextract timers
-    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
-  }
-
-  @Test
-  public void createBundleKeyedResultPropagatesKey() {
-    CommittedBundle<KV<String, Integer>> newBundle =
-        context
-            .createBundle(
-                bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()),
-                downstream)
-            .commit(Instant.now());
-    assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
-  }
-
-  @Test
-  public void createKeyedBundleKeyed() {
-    CommittedBundle<KV<String, Integer>> keyedBundle =
-        context
-            .createKeyedBundle(
-                bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream)
-            .commit(Instant.now());
-    assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
-  }
-
-  @Test
-  public void isDoneWithUnboundedPCollectionAndShutdown() {
-    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
-
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true));
-  }
-
-  @Test
-  public void isDoneWithUnboundedPCollectionAndNotShutdown() {
-    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
-
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
-  }
-
-  @Test
-  public void isDoneWithOnlyBoundedPCollections() {
-    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(created.getProducingTransformInternal()), is(false));
-
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
-    assertThat(context.isDone(created.getProducingTransformInternal()), is(true));
-  }
-
-  @Test
-  public void isDoneWithPartiallyDone() {
-    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
-    assertThat(context.isDone(), is(false));
-
-    UncommittedBundle<Integer> rootBundle = context.createRootBundle(created);
-    rootBundle.add(WindowedValue.valueInGlobalWindow(1));
-    CommittedResult handleResult =
-        context.handleResult(
-            null,
-            ImmutableList.<TimerData>of(),
-            StepTransformResult.withoutHold(created.getProducingTransformInternal())
-                .addOutput(rootBundle)
-                .build());
-    @SuppressWarnings("unchecked")
-    CommittedBundle<Integer> committedBundle =
-        (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult.getOutputs());
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
-    assertThat(context.isDone(), is(false));
-
-    for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) {
-      context.handleResult(
-          committedBundle,
-          ImmutableList.<TimerData>of(),
-          StepTransformResult.withoutHold(consumers).build());
-    }
-    assertThat(context.isDone(), is(true));
-  }
-
-  @Test
-  public void isDoneWithUnboundedAndNotShutdown() {
-    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(), is(false));
-
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
-    context.handleResult(
-        context.createRootBundle(created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
-    assertThat(context.isDone(), is(false));
-
-    context.handleResult(
-        context.createRootBundle(created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
-    assertThat(context.isDone(), is(false));
-  }
-
-  private static class TestBoundedWindow extends BoundedWindow {
-    private final Instant ts;
-
-    public TestBoundedWindow(Instant ts) {
-      this.ts = ts;
-    }
-
-    @Override
-    public Instant maxTimestamp() {
-      return ts;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRegistrarTest.java
deleted file mode 100644
index 59a96ed..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRegistrarTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-import org.apache.beam.sdk.runners.inprocess.InProcessRegistrar.InProcessRunner;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.ServiceLoader;
-
-/** Tests for {@link InProcessRunner}. */
-@RunWith(JUnit4.class)
-public class InProcessPipelineRegistrarTest {
-  @Test
-  public void testCorrectOptionsAreReturned() {
-    assertEquals(
-        ImmutableList.of(InProcessPipelineOptions.class),
-        new InProcessRegistrar.InProcessOptions().getPipelineOptions());
-  }
-
-  @Test
-  public void testCorrectRunnersAreReturned() {
-    assertEquals(
-        ImmutableList.of(InProcessPipelineRunner.class),
-        new InProcessRegistrar.InProcessRunner().getPipelineRunners());
-  }
-
-  @Test
-  public void testServiceLoaderForOptions() {
-    for (PipelineOptionsRegistrar registrar :
-        Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
-      if (registrar instanceof InProcessRegistrar.InProcessOptions) {
-        return;
-      }
-    }
-    fail("Expected to find " + InProcessRegistrar.InProcessOptions.class);
-  }
-
-  @Test
-  public void testServiceLoaderForRunner() {
-    for (PipelineRunnerRegistrar registrar :
-        Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) {
-      if (registrar instanceof InProcessRegistrar.InProcessRunner) {
-        return;
-      }
-    }
-    fail("Expected to find " + InProcessRegistrar.InProcessRunner.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunnerTest.java
deleted file mode 100644
index e9e9e36..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunnerTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.InProcessPipelineResult;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/**
- * Tests for basic {@link InProcessPipelineRunner} functionality.
- */
-@RunWith(JUnit4.class)
-public class InProcessPipelineRunnerTest implements Serializable {
-  @Test
-  public void wordCountShouldSucceed() throws Throwable {
-    Pipeline p = getPipeline();
-
-    PCollection<KV<String, Long>> counts =
-        p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
-            .apply(MapElements.via(new SimpleFunction<String, String>() {
-              @Override
-              public String apply(String input) {
-                return input;
-              }
-            }))
-            .apply(Count.<String>perElement());
-    PCollection<String> countStrs =
-        counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
-          @Override
-          public String apply(KV<String, Long> input) {
-            String str = String.format("%s: %s", input.getKey(), input.getValue());
-            return str;
-          }
-        }));
-
-    PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
-
-    InProcessPipelineResult result = ((InProcessPipelineResult) p.run());
-    result.awaitCompletion();
-  }
-
-  private Pipeline getPipeline() {
-    PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(InProcessPipelineRunner.class);
-
-    Pipeline p = Pipeline.create(opts);
-    return p;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java
deleted file mode 100644
index 03443f8..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doAnswer;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Mean;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Tests for {@link InProcessSideInputContainer}.
- */
-@RunWith(JUnit4.class)
-public class InProcessSideInputContainerTest {
-  private static final BoundedWindow FIRST_WINDOW =
-      new BoundedWindow() {
-        @Override
-        public Instant maxTimestamp() {
-          return new Instant(789541L);
-        }
-
-        @Override
-        public String toString() {
-          return "firstWindow";
-        }
-      };
-
-  private static final BoundedWindow SECOND_WINDOW =
-      new BoundedWindow() {
-        @Override
-        public Instant maxTimestamp() {
-          return new Instant(14564786L);
-        }
-
-        @Override
-        public String toString() {
-          return "secondWindow";
-        }
-      };
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Mock
-  private InProcessEvaluationContext context;
-
-  private TestPipeline pipeline;
-
-  private InProcessSideInputContainer container;
-
-  private PCollectionView<Map<String, Integer>> mapView;
-  private PCollectionView<Double> singletonView;
-
-  // Not present in container.
-  private PCollectionView<Iterable<Integer>> iterableView;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    pipeline = TestPipeline.create();
-
-    PCollection<Integer> create =
-        pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));
-
-    mapView =
-        create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo"))
-            .apply("asMapView", View.<String, Integer>asMap());
-
-    singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView());
-    iterableView = create.apply("asIterableView", View.<Integer>asIterable());
-
-    container = InProcessSideInputContainer.create(
-        context, ImmutableList.of(iterableView, mapView, singletonView));
-  }
-
-  @Test
-  public void getAfterWriteReturnsPaneInWindow() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
-
-    Map<String, Integer> viewContents =
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-            .get(mapView, FIRST_WINDOW);
-    assertThat(viewContents, hasEntry("one", 1));
-    assertThat(viewContents, hasEntry("two", 2));
-    assertThat(viewContents.size(), is(2));
-  }
-
-  @Test
-  public void getReturnsLatestPaneInWindow() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1),
-            new Instant(1L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2),
-            new Instant(20L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
-
-    Map<String, Integer> viewContents =
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-            .get(mapView, SECOND_WINDOW);
-    assertThat(viewContents, hasEntry("one", 1));
-    assertThat(viewContents, hasEntry("two", 2));
-    assertThat(viewContents.size(), is(2));
-
-    WindowedValue<KV<String, Integer>> three =
-        WindowedValue.of(
-            KV.of("three", 3),
-            new Instant(300L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
-
-    Map<String, Integer> overwrittenViewContents =
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-            .get(mapView, SECOND_WINDOW);
-    assertThat(overwrittenViewContents, hasEntry("three", 3));
-    assertThat(overwrittenViewContents.size(), is(1));
-  }
-
-  /**
-   * Demonstrates that calling get() on a window that currently has no data does not return until
-   * there is data in the pane.
-   */
-  @Test
-  public void getBlocksUntilPaneAvailable() throws Exception {
-    BoundedWindow window =
-        new BoundedWindow() {
-          @Override
-          public Instant maxTimestamp() {
-            return new Instant(1024L);
-          }
-        };
-    Future<Double> singletonFuture =
-        getFutureOfView(
-            container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)),
-            singletonView,
-            window);
-
-    WindowedValue<Double> singletonValue =
-        WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
-    assertThat(singletonFuture.isDone(), is(false));
-    container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue));
-    assertThat(singletonFuture.get(), equalTo(4.75));
-  }
-
-  @Test
-  public void withPCollectionViewsWithPutInOriginalReturnsContents() throws Exception {
-    BoundedWindow window = new BoundedWindow() {
-      @Override
-      public Instant maxTimestamp() {
-        return new Instant(1024L);
-      }
-    };
-    SideInputReader newReader =
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView));
-    Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, window);
-
-    WindowedValue<Double> singletonValue =
-        WindowedValue.of(24.125, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
-    assertThat(singletonFuture.isDone(), is(false));
-    container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue));
-    assertThat(singletonFuture.get(), equalTo(24.125));
-  }
-
-  @Test
-  public void withPCollectionViewsErrorsForContainsNotInViews() {
-    PCollectionView<Map<String, Iterable<String>>> newView =
-        PCollectionViews.multimapView(
-            pipeline,
-            WindowingStrategy.globalDefault(),
-            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString());
-
-    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
-  }
-
-  @Test
-  public void withViewsForViewNotInContainerFails() {
-    PCollectionView<Map<String, Iterable<String>>> newView =
-        PCollectionViews.multimapView(
-            pipeline,
-            WindowingStrategy.globalDefault(),
-            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("unknown views");
-    thrown.expectMessage(newView.toString());
-
-    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
-  }
-
-  @Test
-  public void getOnReaderForViewNotInReaderFails() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("unknown view: " + iterableView.toString());
-
-    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-        .get(iterableView, GlobalWindow.INSTANCE);
-  }
-
-  @Test
-  public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
-    WindowedValue<Double> firstWindowedValue =
-        WindowedValue.of(
-            2.875,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Double> secondWindowedValue =
-        WindowedValue.of(
-            4.125,
-            SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
-            SECOND_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
-    assertThat(
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
-            .get(singletonView, FIRST_WINDOW),
-        equalTo(2.875));
-    assertThat(
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
-            .get(singletonView, SECOND_WINDOW),
-        equalTo(4.125));
-  }
-
-  @Test
-  public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
-    WindowedValue<Integer> firstValue =
-        WindowedValue.of(
-            44,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Integer> secondValue =
-        WindowedValue.of(
-            44,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
-    container.write(iterableView, ImmutableList.of(firstValue, secondValue));
-
-    assertThat(
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
-            .get(iterableView, FIRST_WINDOW),
-        contains(44, 44));
-  }
-
-  @Test
-  public void writeForElementInMultipleWindowsSucceeds() throws Exception {
-    WindowedValue<Double> multiWindowedValue =
-        WindowedValue.of(
-            2.875,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(singletonView, ImmutableList.of(multiWindowedValue));
-    assertThat(
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
-            .get(singletonView, FIRST_WINDOW),
-        equalTo(2.875));
-    assertThat(
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
-            .get(singletonView, SECOND_WINDOW),
-        equalTo(2.875));
-  }
-
-  @Test
-  public void finishDoesNotOverwriteWrittenElements() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1),
-            new Instant(1L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2),
-            new Instant(20L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
-
-    immediatelyInvokeCallback(mapView, SECOND_WINDOW);
-
-    Map<String, Integer> viewContents =
-        container
-            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-            .get(mapView, SECOND_WINDOW);
-
-    assertThat(viewContents, hasEntry("one", 1));
-    assertThat(viewContents, hasEntry("two", 2));
-    assertThat(viewContents.size(), is(2));
-  }
-
-  @Test
-  public void finishOnPendingViewsSetsEmptyElements() throws Exception {
-    immediatelyInvokeCallback(mapView, SECOND_WINDOW);
-    Future<Map<String, Integer>> mapFuture =
-        getFutureOfView(
-            container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)),
-            mapView,
-            SECOND_WINDOW);
-
-    assertThat(mapFuture.get().isEmpty(), is(true));
-  }
-
-  /**
-   * Demonstrates that calling isReady on an empty container throws an
-   * {@link IllegalArgumentException}.
-   */
-  @Test
-  public void isReadyInEmptyReaderThrows() {
-    ReadyCheckingSideInputReader reader =
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of());
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("does not contain");
-    thrown.expectMessage(ImmutableList.of().toString());
-    reader.isReady(mapView, GlobalWindow.INSTANCE);
-  }
-
-  /**
-   * Demonstrates that calling isReady returns false until elements are written to the
-   * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true.
-   */
-  @Test
-  public void isReadyForSomeNotReadyViewsFalseUntilElements() {
-    container.write(
-        mapView,
-        ImmutableList.of(
-            WindowedValue.of(
-                KV.of("one", 1),
-                SECOND_WINDOW.maxTimestamp().minus(100L),
-                SECOND_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-
-    ReadyCheckingSideInputReader reader =
-        container.createReaderForViews(ImmutableList.of(mapView, singletonView));
-    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
-    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
-
-    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
-
-    container.write(
-        mapView,
-        ImmutableList.of(
-            WindowedValue.of(
-                KV.of("too", 2),
-                FIRST_WINDOW.maxTimestamp().minus(100L),
-                FIRST_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
-
-    container.write(
-        singletonView,
-        ImmutableList.of(
-            WindowedValue.of(
-                1.25,
-                SECOND_WINDOW.maxTimestamp().minus(100L),
-                SECOND_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
-    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
-
-    assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
-    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
-  }
-
-  @Test
-  public void isReadyForEmptyWindowTrue() {
-    immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE);
-
-    ReadyCheckingSideInputReader reader =
-        container.createReaderForViews(ImmutableList.of(mapView, singletonView));
-    assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true));
-    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
-
-    immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE);
-    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
-  }
-
-  /**
-   * When a callAfterWindowCloses with the specified view's producing transform, window, and
-   * windowing strategy is invoked, immediately execute the callback.
-   */
-  private void immediatelyInvokeCallback(PCollectionView<?> view, BoundedWindow window) {
-    doAnswer(
-            new Answer<Void>() {
-              @Override
-              public Void answer(InvocationOnMock invocation) throws Throwable {
-                Object callback = invocation.getArguments()[3];
-                Runnable callbackRunnable = (Runnable) callback;
-                callbackRunnable.run();
-                return null;
-              }
-            })
-        .when(context)
-        .scheduleAfterOutputWouldBeProduced(
-            Mockito.eq(view),
-            Mockito.eq(window),
-            Mockito.eq(view.getWindowingStrategyInternal()),
-            Mockito.any(Runnable.class));
-  }
-
-  private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader,
-      final PCollectionView<ValueT> view, final BoundedWindow window) {
-    Callable<ValueT> callable = new Callable<ValueT>() {
-      @Override
-      public ValueT call() throws Exception {
-        return myReader.get(view, window);
-      }
-    };
-    return Executors.newSingleThreadExecutor().submit(callable);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternalsTest.java
deleted file mode 100644
index b496981..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternalsTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link InProcessTimerInternals}.
- */
-@RunWith(JUnit4.class)
-public class InProcessTimerInternalsTest {
-  private MockClock clock;
-  @Mock private TransformWatermarks watermarks;
-
-  private TimerUpdateBuilder timerUpdateBuilder;
-
-  private InProcessTimerInternals internals;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    clock = MockClock.fromInstant(new Instant(0));
-
-    timerUpdateBuilder = TimerUpdate.builder(1234);
-
-    internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder);
-  }
-
-  @Test
-  public void setTimerAddsToBuilder() {
-    TimerData eventTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
-    TimerData processingTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
-    TimerData synchronizedProcessingTimer =
-        TimerData.of(
-            StateNamespaces.global(),
-            new Instant(98745632189L),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    internals.setTimer(eventTimer);
-    internals.setTimer(processingTimer);
-    internals.setTimer(synchronizedProcessingTimer);
-
-    assertThat(
-        internals.getTimerUpdate().getSetTimers(),
-        containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer));
-  }
-
-  @Test
-  public void deleteTimerDeletesOnBuilder() {
-    TimerData eventTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
-    TimerData processingTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
-    TimerData synchronizedProcessingTimer =
-        TimerData.of(
-            StateNamespaces.global(),
-            new Instant(98745632189L),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    internals.deleteTimer(eventTimer);
-    internals.deleteTimer(processingTimer);
-    internals.deleteTimer(synchronizedProcessingTimer);
-
-    assertThat(
-        internals.getTimerUpdate().getDeletedTimers(),
-        containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer));
-  }
-
-  @Test
-  public void getProcessingTimeIsClockNow() {
-    assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
-    Instant oldProcessingTime = internals.currentProcessingTime();
-
-    clock.advance(Duration.standardHours(12));
-
-    assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
-    assertThat(
-        internals.currentProcessingTime(),
-        equalTo(oldProcessingTime.plus(Duration.standardHours(12))));
-  }
-
-  @Test
-  public void getSynchronizedProcessingTimeIsWatermarkSynchronizedInputTime() {
-    when(watermarks.getSynchronizedProcessingInputTime()).thenReturn(new Instant(12345L));
-    assertThat(internals.currentSynchronizedProcessingTime(), equalTo(new Instant(12345L)));
-  }
-
-  @Test
-  public void getInputWatermarkTimeUsesWatermarkTime() {
-    when(watermarks.getInputWatermark()).thenReturn(new Instant(8765L));
-    assertThat(internals.currentInputWatermarkTime(), equalTo(new Instant(8765L)));
-  }
-
-  @Test
-  public void getOutputWatermarkTimeUsesWatermarkTime() {
-    when(watermarks.getOutputWatermark()).thenReturn(new Instant(25525L));
-    assertThat(internals.currentOutputWatermarkTime(), equalTo(new Instant(25525L)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitorTest.java
deleted file mode 100644
index b89340e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitorTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * Tests for {@link KeyedPValueTrackingVisitor}.
- */
-@RunWith(JUnit4.class)
-public class KeyedPValueTrackingVisitorTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private KeyedPValueTrackingVisitor visitor;
-  private Pipeline p;
-
-  @Before
-  public void setup() {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    p = Pipeline.create(options);
-    @SuppressWarnings("rawtypes")
-    Set<Class<? extends PTransform>> producesKeyed =
-        ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class);
-    visitor = KeyedPValueTrackingVisitor.create(producesKeyed);
-  }
-
-  @Test
-  public void primitiveProducesKeyedOutputUnkeyedInputKeyedOutput() {
-    PCollection<Integer> keyed =
-        p.apply(Create.<Integer>of(1, 2, 3)).apply(new PrimitiveKeyer<Integer>());
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getKeyedPValues(), hasItem(keyed));
-  }
-
-  @Test
-  public void primitiveProducesKeyedOutputKeyedInputKeyedOutut() {
-    PCollection<Integer> keyed =
-        p.apply(Create.<Integer>of(1, 2, 3))
-            .apply("firstKey", new PrimitiveKeyer<Integer>())
-            .apply("secondKey", new PrimitiveKeyer<Integer>());
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getKeyedPValues(), hasItem(keyed));
-  }
-
-  @Test
-  public void compositeProducesKeyedOutputUnkeyedInputKeyedOutput() {
-    PCollection<Integer> keyed =
-        p.apply(Create.<Integer>of(1, 2, 3)).apply(new CompositeKeyer<Integer>());
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getKeyedPValues(), hasItem(keyed));
-  }
-
-  @Test
-  public void compositeProducesKeyedOutputKeyedInputKeyedOutut() {
-    PCollection<Integer> keyed =
-        p.apply(Create.<Integer>of(1, 2, 3))
-            .apply("firstKey", new CompositeKeyer<Integer>())
-            .apply("secondKey", new CompositeKeyer<Integer>());
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getKeyedPValues(), hasItem(keyed));
-  }
-
-
-  @Test
-  public void noInputUnkeyedOutput() {
-    PCollection<KV<Integer, Iterable<Void>>> unkeyed =
-        p.apply(
-            Create.of(KV.<Integer, Iterable<Void>>of(-1, Collections.<Void>emptyList()))
-                .withCoder(KvCoder.of(VarIntCoder.of(), IterableCoder.of(VoidCoder.of()))));
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed)));
-  }
-
-  @Test
-  public void keyedInputNotProducesKeyedOutputUnkeyedOutput() {
-    PCollection<Integer> onceKeyed =
-        p.apply(Create.<Integer>of(1, 2, 3))
-            .apply(new PrimitiveKeyer<Integer>())
-            .apply(ParDo.of(new IdentityFn<Integer>()));
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getKeyedPValues(), not(hasItem(onceKeyed)));
-  }
-
-  @Test
-  public void unkeyedInputNotProducesKeyedOutputUnkeyedOutput() {
-    PCollection<Integer> unkeyed =
-        p.apply(Create.<Integer>of(1, 2, 3)).apply(ParDo.of(new IdentityFn<Integer>()));
-
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed)));
-  }
-
-  @Test
-  public void traverseMultipleTimesThrows() {
-    p.apply(
-            Create.<KV<Integer, Void>>of(
-                    KV.of(1, (Void) null), KV.of(2, (Void) null), KV.of(3, (Void) null))
-                .withCoder(KvCoder.of(VarIntCoder.of(), VoidCoder.of())))
-        .apply(GroupByKey.<Integer, Void>create())
-        .apply(Keys.<Integer>create());
-
-    p.traverseTopologically(visitor);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("already been finalized");
-    thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName());
-    p.traverseTopologically(visitor);
-  }
-
-  @Test
-  public void getKeyedPValuesBeforeTraverseThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getKeyedPValues");
-    visitor.getKeyedPValues();
-  }
-
-  private static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
-    @Override
-    public PCollection<K> apply(PCollection<K> input) {
-      return PCollection.<K>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
-    }
-  }
-
-  private static class CompositeKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
-    @Override
-    public PCollection<K> apply(PCollection<K> input) {
-      return input.apply(new PrimitiveKeyer<K>()).apply(ParDo.of(new IdentityFn<K>()));
-    }
-  }
-
-  private static class IdentityFn<K> extends DoFn<K, K> {
-    @Override
-    public void processElement(DoFn<K, K>.ProcessContext c) throws Exception {
-      c.output(c.element());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/MockClock.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/MockClock.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/MockClock.java
deleted file mode 100644
index 152cac4..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/MockClock.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A clock that returns a constant value for now which can be set with calls to
- * {@link #set(Instant)}.
- *
- * <p>For uses of the {@link Clock} interface in unit tests.
- */
-public class MockClock implements Clock {
-
-  private Instant now;
-
-  public static MockClock fromInstant(Instant initial) {
-    return new MockClock(initial);
-  }
-
-  private MockClock(Instant initialNow) {
-    this.now = initialNow;
-  }
-
-  public void set(Instant newNow) {
-    checkArgument(!newNow.isBefore(now), "Cannot move MockClock backwards in time from %s to %s",
-        now, newNow);
-    this.now = newNow;
-  }
-
-  public void advance(Duration duration) {
-    checkArgument(
-        duration.getMillis() > 0,
-        "Cannot move MockClock backwards in time by duration %s",
-        duration);
-    set(now.plus(duration));
-  }
-
-  @Override
-  public Instant now() {
-    return now;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
deleted file mode 100644
index a048e3a..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/**
- * Tests for {@link ParDoMultiEvaluatorFactory}.
- */
-@RunWith(JUnit4.class)
-public class ParDoMultiEvaluatorFactoryTest implements Serializable {
-  private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
-
-  @Test
-  public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-    final TupleTag<Integer> lengthTag = new TupleTag<>();
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
-                    c.sideOutput(elementTag, c.element());
-                    c.sideOutput(lengthTag, c.element().length());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-    PCollection<Integer> lengthOutput = outputTuple.get(lengthTag);
-
-    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
-    UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput);
-
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, elementOutput))
-        .thenReturn(elementOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, lengthOutput)).thenReturn(lengthOutputBundle);
-
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
-
-    org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory()
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(
-            lengthOutputBundle, mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getCounters(), equalTo(counters));
-
-    assertThat(
-        mainOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
-            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(
-                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        elementOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<String>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow("foo"),
-            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
-            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        lengthOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<Integer>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(3),
-            WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  @Test
-  public void testParDoMultiUndeclaredSideOutput() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-    final TupleTag<Integer> lengthTag = new TupleTag<>();
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.output(KV.<String, Integer>of(c.element(), c.element().length()));
-                    c.sideOutput(elementTag, c.element());
-                    c.sideOutput(lengthTag, c.element().length());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
-
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, elementOutput))
-        .thenReturn(elementOutputBundle);
-
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
-
-    org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory()
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getCounters(), equalTo(counters));
-
-    assertThat(
-        mainOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
-            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
-            WindowedValue.valueInGlobalWindow(
-                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(
-        elementOutputBundle.commit(Instant.now()).getElements(),
-        Matchers.<WindowedValue<String>>containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow("foo"),
-            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
-            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-  }
-
-  @Test
-  public void finishBundleWithStatePutsStateInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-
-    final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
-        StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow());
-    final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
-    final StateNamespace windowNs =
-        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.windowingInternals()
-                        .stateInternals()
-                        .state(StateNamespaces.global(), watermarkTag)
-                        .add(new Instant(20202L + c.element().length()));
-                    c.windowingInternals()
-                        .stateInternals()
-                        .state(
-                            StateNamespaces.window(
-                                GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
-                            bagTag)
-                        .add(c.element());
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
-
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, elementOutput))
-        .thenReturn(elementOutputBundle);
-
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, "myKey", null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
-
-    org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory()
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getOutputBundles(),
-        Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
-    assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L)));
-    assertThat(result.getState(), not(nullValue()));
-    assertThat(
-        result.getState().state(StateNamespaces.global(), watermarkTag).read(),
-        equalTo(new Instant(20205L)));
-    assertThat(
-        result.getState().state(windowNs, bagTag).read(),
-        containsInAnyOrder("foo", "bara", "bazam"));
-  }
-
-  @Test
-  public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
-    TestPipeline p = TestPipeline.create();
-
-    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
-
-    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {};
-    final TupleTag<String> elementTag = new TupleTag<>();
-
-    final TimerData addedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(
-                    new Instant(0).plus(Duration.standardMinutes(5)),
-                    new Instant(1)
-                        .plus(Duration.standardMinutes(5))
-                        .plus(Duration.standardHours(1)))),
-            new Instant(54541L),
-            TimeDomain.EVENT_TIME);
-    final TimerData deletedTimer =
-        TimerData.of(
-            StateNamespaces.window(
-                IntervalWindow.getCoder(),
-                new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
-            new Instant(3400000),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-    BoundMulti<String, KV<String, Integer>> pardo =
-        ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
-                  @Override
-                  public void processElement(ProcessContext c) {
-                    c.windowingInternals().stateInternals();
-                    c.windowingInternals()
-                        .timerInternals()
-                        .setTimer(
-                            TimerData.of(
-                                StateNamespaces.window(
-                                    IntervalWindow.getCoder(),
-                                    new IntervalWindow(
-                                        new Instant(0).plus(Duration.standardMinutes(5)),
-                                        new Instant(1)
-                                            .plus(Duration.standardMinutes(5))
-                                            .plus(Duration.standardHours(1)))),
-                                new Instant(54541L),
-                                TimeDomain.EVENT_TIME));
-                    c.windowingInternals()
-                        .timerInternals()
-                        .deleteTimer(
-                            TimerData.of(
-                                StateNamespaces.window(
-                                    IntervalWindow.getCoder(),
-                                    new IntervalWindow(
-                                        new Instant(0),
-                                        new Instant(0).plus(Duration.standardHours(1)))),
-                                new Instant(3400000),
-                                TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
-                  }
-                })
-            .withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
-    PCollectionTuple outputTuple = input.apply(pardo);
-
-    CommittedBundle<String> inputBundle =
-        bundleFactory.createRootBundle(input).commit(Instant.now());
-
-    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
-    PCollection<String> elementOutput = outputTuple.get(elementTag);
-
-    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<KV<String, Integer>> mainOutputBundle =
-        bundleFactory.createRootBundle(mainOutput);
-    UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput);
-
-    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
-    when(evaluationContext.createBundle(inputBundle, elementOutput))
-        .thenReturn(elementOutputBundle);
-
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, "myKey", null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
-
-    org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory()
-            .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
-    evaluator.processElement(
-        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
-    evaluator.processElement(
-        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
-    InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getTimerUpdate(),
-        equalTo(
-            TimerUpdate.builder("myKey")
-                .setTimer(addedTimer)
-                .setTimer(addedTimer)
-                .setTimer(addedTimer)
-                .deletedTimer(deletedTimer)
-                .deletedTimer(deletedTimer)
-                .deletedTimer(deletedTimer)
-                .build()));
-  }
-}


Mime
View raw message