beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamesmal...@apache.org
Subject [38/50] [abbrv] incubator-beam git commit: Finish Flattenning InProcess package
Date Fri, 26 Feb 2016 22:55:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
new file mode 100644
index 0000000..2425152
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
@@ -0,0 +1,1099 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.cloud.dataflow.sdk.values.TimestampedValue;
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link InMemoryWatermarkManager}.
+ */
+@RunWith(JUnit4.class)
+public class InMemoryWatermarkManagerTest implements Serializable {
+  private transient MockClock clock;
+
+  private transient PCollection<Integer> createdInts;
+
+  private transient PCollection<Integer> filtered;
+  private transient PCollection<Integer> filteredTimesTwo;
+  private transient PCollection<KV<String, Integer>> keyed;
+
+  private transient PCollection<Integer> intsToFlatten;
+  private transient PCollection<Integer> flattened;
+
+  private transient InMemoryWatermarkManager manager;
+
+  @Before
+  public void setup() {
+    TestPipeline p = TestPipeline.create();
+
+    createdInts = p.apply("createdInts", Create.of(1, 2, 3));
+
+    filtered = createdInts.apply("filtered", Filter.greaterThan(1));
+    filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception {
+        c.output(c.element() * 2);
+      }
+    }));
+
+    keyed = createdInts.apply("keyed", WithKeys.<String, Integer>of("MyKey"));
+
+    intsToFlatten = p.apply("intsToFlatten", Create.of(-1, 256, 65535));
+    PCollectionList<Integer> preFlatten = PCollectionList.of(createdInts).and(intsToFlatten);
+    flattened = preFlatten.apply("flattened", Flatten.<Integer>pCollections());
+
+    Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
+        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
+            createdInts.getProducingTransformInternal(),
+            intsToFlatten.getProducingTransformInternal());
+
+    Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers = new HashMap<>();
+    consumers.put(
+        createdInts,
+        ImmutableList.<AppliedPTransform<?, ?, ?>>of(filtered.getProducingTransformInternal(),
+            keyed.getProducingTransformInternal(), flattened.getProducingTransformInternal()));
+    consumers.put(
+        filtered,
+        Collections.<AppliedPTransform<?, ?, ?>>singleton(
+            filteredTimesTwo.getProducingTransformInternal()));
+    consumers.put(filteredTimesTwo, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+    consumers.put(keyed, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+
+    consumers.put(
+        intsToFlatten,
+        Collections.<AppliedPTransform<?, ?, ?>>singleton(
+            flattened.getProducingTransformInternal()));
+    consumers.put(flattened, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+
+    clock = MockClock.fromInstant(new Instant(1000));
+
+    manager = InMemoryWatermarkManager.create(clock, rootTransforms, consumers);
+  }
+
+  /**
+   * Demonstrates that getWatermark, when called on an {@link AppliedPTransform} that has not
+   * processed any elements, returns the {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
+   */
+  @Test
+  public void getWatermarkForUntouchedTransform() {
+    TransformWatermarks watermarks =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+
+    assertThat(watermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+    assertThat(watermarks.getOutputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+  }
+
+  /**
+   * Demonstrates that getWatermark for a transform that consumes no input uses the Watermark
+   * Hold value provided to it as the output watermark.
+   */
+  @Test
+  public void getWatermarkForUpdatedSourceTransform() {
+    CommittedBundle<Integer> output = globallyWindowedBundle(createdInts, 1);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(output), new Instant(8000L));
+    TransformWatermarks updatedSourceWatermark =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+
+    assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new Instant(8000L)));
+  }
+
+  /**
+   * Demonstrates that getWatermark for a transform that takes multiple inputs is held to the
+   * minimum watermark across all of its inputs.
+   */
+  @Test
+  public void getWatermarkForMultiInputTransform() {
+    CommittedBundle<Integer> secondPcollectionBundle = globallyWindowedBundle(intsToFlatten, -1);
+
+    manager.updateWatermarks(null, intsToFlatten.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // We didn't do anything for the first source, so we shouldn't have progressed the watermark
+    TransformWatermarks firstSourceWatermark =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(
+        firstSourceWatermark.getOutputWatermark(),
+        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+
+    // the Second Source output all of the elements so it should be done (with a watermark at the
+    // end of time).
+    TransformWatermarks secondSourceWatermark =
+        manager.getWatermarks(intsToFlatten.getProducingTransformInternal());
+    assertThat(
+        secondSourceWatermark.getOutputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+
+    // We haven't consumed anything yet, so our watermark should be at the beginning of time
+    TransformWatermarks transformWatermark =
+        manager.getWatermarks(flattened.getProducingTransformInternal());
+    assertThat(
+        transformWatermark.getInputWatermark(), not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+    assertThat(
+        transformWatermark.getOutputWatermark(), not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+
+    CommittedBundle<Integer> flattenedBundleSecondCreate = globallyWindowedBundle(flattened, -1);
+    // We have finished processing the bundle from the second PCollection, but we haven't consumed
+    // anything from the first PCollection yet; so our watermark shouldn't advance
+    manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
+        null);
+    TransformWatermarks transformAfterProcessing =
+        manager.getWatermarks(flattened.getProducingTransformInternal());
+    manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
+        null);
+    assertThat(
+        transformAfterProcessing.getInputWatermark(),
+        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+    assertThat(
+        transformAfterProcessing.getOutputWatermark(),
+        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+
+    Instant firstCollectionTimestamp = new Instant(10000);
+    CommittedBundle<Integer> firstPcollectionBundle =
+        timestampedBundle(createdInts, TimestampedValue.<Integer>of(5, firstCollectionTimestamp));
+    // the source is done, but elements are still buffered. The source output watermark should be
+    // past the end of the global window
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
+        new Instant(Long.MAX_VALUE));
+    TransformWatermarks firstSourceWatermarks =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(
+        firstSourceWatermarks.getOutputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+
+    // We still haven't consumed any of the first source's input, so the watermark should still not
+    // progress
+    TransformWatermarks flattenAfterSourcesProduced =
+        manager.getWatermarks(flattened.getProducingTransformInternal());
+    assertThat(
+        flattenAfterSourcesProduced.getInputWatermark(), not(laterThan(firstCollectionTimestamp)));
+    assertThat(
+        flattenAfterSourcesProduced.getOutputWatermark(), not(laterThan(firstCollectionTimestamp)));
+
+    // We have buffered inputs, but since the PCollection has all of the elements (has a WM past the
+    // end of the global window), we should have a watermark equal to the min among buffered
+    // elements
+    TransformWatermarks withBufferedElements =
+        manager.getWatermarks(flattened.getProducingTransformInternal());
+    assertThat(withBufferedElements.getInputWatermark(), equalTo(firstCollectionTimestamp));
+    assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp));
+
+    CommittedBundle<?> completedFlattenBundle =
+        InProcessBundle.unkeyed(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
+        null);
+    TransformWatermarks afterConsumingAllInput =
+        manager.getWatermarks(flattened.getProducingTransformInternal());
+    assertThat(
+        afterConsumingAllInput.getInputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    assertThat(
+        afterConsumingAllInput.getOutputWatermark(),
+        not(laterThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+  }
+
+  /**
+   * Demonstrates that pending elements are independent among
+   * {@link AppliedPTransform AppliedPTransforms} that consume the same input {@link PCollection}.
+   */
+  @Test
+  public void getWatermarkForMultiConsumedCollection() {
+    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
+        TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
+        TimestampedValue.of(3, new Instant(-1000L)));
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+    TransformWatermarks createdAfterProducing =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(
+        createdAfterProducing.getOutputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+
+    CommittedBundle<KV<String, Integer>> keyBundle =
+        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
+            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
+            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+    manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), null);
+    TransformWatermarks keyedWatermarks =
+        manager.getWatermarks(keyed.getProducingTransformInternal());
+    assertThat(
+        keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    assertThat(
+        keyedWatermarks.getOutputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+
+    TransformWatermarks filteredWatermarks =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(filteredWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L))));
+    assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
+
+    CommittedBundle<Integer> filteredBundle =
+        timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L)));
+    manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filteredBundle), null);
+    TransformWatermarks filteredProcessedWatermarks =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(
+        filteredProcessedWatermarks.getInputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    assertThat(
+        filteredProcessedWatermarks.getOutputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+  }
+
+  /**
+   * Demonstrates that the watermark of an {@link AppliedPTransform} is held to the provided
+   * watermark hold.
+   */
+  @Test
+  public void updateWatermarkWithWatermarkHolds() {
+    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
+        TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
+        TimestampedValue.of(3, new Instant(-1000L)));
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+
+    CommittedBundle<KV<String, Integer>> keyBundle =
+        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
+            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
+            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+    manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle),
+        new Instant(500L));
+    TransformWatermarks keyedWatermarks =
+        manager.getWatermarks(keyed.getProducingTransformInternal());
+    assertThat(
+        keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    assertThat(keyedWatermarks.getOutputWatermark(), not(laterThan(new Instant(500L))));
+  }
+
+  /**
+   * Demonstrates that the watermark of an {@link AppliedPTransform} is held to the provided
+   * watermark hold.
+   */
+  @Test
+  public void updateWatermarkWithKeyedWatermarkHolds() {
+    CommittedBundle<Integer> firstKeyBundle =
+        InProcessBundle.keyed(createdInts, "Odd")
+            .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
+            .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
+            .commit(clock.now());
+
+    CommittedBundle<Integer> secondKeyBundle =
+        InProcessBundle.keyed(createdInts, "Even")
+            .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
+            .commit(clock.now());
+
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        ImmutableList.of(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    manager.updateWatermarks(firstKeyBundle, filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(-1000L));
+    manager.updateWatermarks(secondKeyBundle, filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(1234L));
+
+    TransformWatermarks filteredWatermarks =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(
+        filteredWatermarks.getInputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
+
+    CommittedBundle<Integer> fauxFirstKeyTimerBundle =
+        InProcessBundle.keyed(createdInts, "Odd").commit(clock.now());
+    manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
+
+    CommittedBundle<Integer> fauxSecondKeyTimerBundle =
+        InProcessBundle.keyed(createdInts, "Even").commit(clock.now());
+    manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(5678L));
+    assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
+
+    manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(
+        filteredWatermarks.getOutputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+  }
+
+  /**
+   * Demonstrates that updated output watermarks are monotonic in the presence of late data, when
+   * called on an {@link AppliedPTransform} that consumes no input.
+   */
+  @Test
+  public void updateOutputWatermarkShouldBeMonotonic() {
+    CommittedBundle<?> firstInput =
+        InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(firstInput), new Instant(0L));
+    TransformWatermarks firstWatermarks =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
+
+    CommittedBundle<?> secondInput =
+        InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(secondInput), new Instant(-250L));
+    TransformWatermarks secondWatermarks =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L))));
+  }
+
+  /**
+   * Demonstrates that updated output watermarks are monotonic in the presence of watermark holds
+   * that become earlier than a previous watermark hold.
+   */
+  @Test
+  public void updateWatermarkWithHoldsShouldBeMonotonic() {
+    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
+        TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)),
+        TimestampedValue.of(3, new Instant(-1000L)));
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE));
+
+    CommittedBundle<KV<String, Integer>> keyBundle =
+        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)),
+            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
+            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+    manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle),
+        new Instant(500L));
+    TransformWatermarks keyedWatermarks =
+        manager.getWatermarks(keyed.getProducingTransformInternal());
+    assertThat(
+        keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    assertThat(keyedWatermarks.getOutputWatermark(), not(laterThan(new Instant(500L))));
+    Instant oldOutputWatermark = keyedWatermarks.getOutputWatermark();
+
+    TransformWatermarks updatedWatermarks =
+        manager.getWatermarks(keyed.getProducingTransformInternal());
+    assertThat(
+        updatedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    // We added a hold prior to the old watermark; we shouldn't progress (due to the earlier hold)
+    // but the watermark is monotonic and should not backslide to the new, earlier hold
+    assertThat(updatedWatermarks.getOutputWatermark(), equalTo(oldOutputWatermark));
+  }
+
+  /**
+   * Demonstrates that updateWatermarks in the presence of late data is monotonic.
+   */
+  @Test
+  public void updateWatermarkWithLateData() {
+    Instant sourceWatermark = new Instant(1_000_000L);
+    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
+        TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new Instant(1234L)));
+
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(createdBundle), sourceWatermark);
+
+    CommittedBundle<KV<String, Integer>> keyBundle =
+        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark),
+            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)));
+
+    // Finish processing the on-time data. The watermarks should progress to be equal to the source
+    manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), null);
+    TransformWatermarks onTimeWatermarks =
+        manager.getWatermarks(keyed.getProducingTransformInternal());
+    assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
+    assertThat(onTimeWatermarks.getOutputWatermark(), equalTo(sourceWatermark));
+
+    CommittedBundle<Integer> lateDataBundle =
+        timestampedBundle(createdInts, TimestampedValue.of(3, new Instant(-1000L)));
+    // the late data arrives in a downstream PCollection after its watermark has advanced past it;
+    // we don't advance the watermark past the current watermark until we've consumed the late data
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(lateDataBundle), new Instant(2_000_000L));
+    TransformWatermarks bufferedLateWm =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L)));
+
+    // The input watermark should be held to its previous value (not advanced due to late data; not
+    // moved backwards in the presence of watermarks due to monotonicity).
+    TransformWatermarks lateDataBufferedWatermark =
+        manager.getWatermarks(keyed.getProducingTransformInternal());
+    assertThat(lateDataBufferedWatermark.getInputWatermark(), not(earlierThan(sourceWatermark)));
+    assertThat(lateDataBufferedWatermark.getOutputWatermark(), not(earlierThan(sourceWatermark)));
+
+    CommittedBundle<KV<String, Integer>> lateKeyedBundle =
+        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
+    manager.updateWatermarks(lateDataBundle, keyed.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(lateKeyedBundle), null);
+  }
+
+  /**
+   * Demonstrates that after watermarks of an upstream transform are updated, but no output has been
+   * produced, the watermarks of a downstream process are advanced.
+   */
+  @Test
+  public void getWatermarksAfterOnlyEmptyOutput() {
+    CommittedBundle<Integer> emptyCreateOutput = globallyWindowedBundle(createdInts);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    TransformWatermarks updatedSourceWatermarks =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+
+    assertThat(
+        updatedSourceWatermarks.getOutputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+
+    TransformWatermarks finishedFilterWatermarks =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(
+        finishedFilterWatermarks.getInputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    assertThat(
+        finishedFilterWatermarks.getOutputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+  }
+
+  /**
+   * Demonstrates that after watermarks of an upstream transform are updated, but no output has been
+   * produced, and the downstream transform has a watermark hold, the watermark is held to the hold.
+   */
+  @Test
+  public void getWatermarksAfterHoldAndEmptyOutput() {
+    CommittedBundle<Integer> firstCreateOutput = globallyWindowedBundle(createdInts, 1, 2);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(firstCreateOutput), new Instant(12_000L));
+
+    CommittedBundle<Integer> firstFilterOutput = globallyWindowedBundle(filtered);
+    manager.updateWatermarks(firstCreateOutput, filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
+        new Instant(10_000L));
+    TransformWatermarks firstFilterWatermarks =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(firstFilterWatermarks.getInputWatermark(), not(earlierThan(new Instant(12_000L))));
+    assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L))));
+
+    CommittedBundle<Integer> emptyCreateOutput = globallyWindowedBundle(createdInts);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    TransformWatermarks updatedSourceWatermarks =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+
+    assertThat(
+        updatedSourceWatermarks.getOutputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+
+    TransformWatermarks finishedFilterWatermarks =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(
+        finishedFilterWatermarks.getInputWatermark(),
+        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
+    assertThat(finishedFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L))));
+  }
+
+  @Test
+  public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
+    TransformWatermarks watermarks =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
+    assertThat(
+        watermarks.getSynchronizedProcessingOutputTime(),
+        equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+
+    TransformWatermarks filteredWatermarks =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    // Non-root processing watermarks don't progress until data has been processed
+    assertThat(
+        filteredWatermarks.getSynchronizedProcessingInputTime(),
+        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+    assertThat(
+        filteredWatermarks.getSynchronizedProcessingOutputTime(),
+        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+
+    CommittedBundle<Integer> createOutput =
+        InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L));
+
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
+    TransformWatermarks createAfterUpdate =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
+    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
+
+    TransformWatermarks filterAfterProduced =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(
+        filterAfterProduced.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
+    assertThat(
+        filterAfterProduced.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now())));
+
+    clock.set(new Instant(1500L));
+    assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
+    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
+    assertThat(
+        filterAfterProduced.getSynchronizedProcessingInputTime(),
+        not(laterThan(new Instant(1250L))));
+    assertThat(
+        filterAfterProduced.getSynchronizedProcessingOutputTime(),
+        not(laterThan(new Instant(1250L))));
+
+    CommittedBundle<?> filterOutputBundle =
+        InProcessBundle.unkeyed(intsToFlatten).commit(new Instant(1250L));
+    manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    TransformWatermarks filterAfterConsumed =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    assertThat(
+        filterAfterConsumed.getSynchronizedProcessingInputTime(),
+        not(laterThan(createAfterUpdate.getSynchronizedProcessingOutputTime())));
+    assertThat(
+        filterAfterConsumed.getSynchronizedProcessingOutputTime(),
+        not(laterThan(filterAfterConsumed.getSynchronizedProcessingInputTime())));
+  }
+
+  /**
+   * Demonstrates that the Synchronized Processing Time output watermark cannot progress past
+   * pending timers in the same set. This propagates to all downstream SynchronizedProcessingTimes.
+   *
+   * <p>Also demonstrate that the result is monotonic.
+   */
+  //  @Test
+  public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
+    CommittedBundle<Integer> createdBundle = globallyWindowedBundle(createdInts, 1, 2, 4, 8);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(1248L));
+
+    TransformWatermarks filteredWms =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    TransformWatermarks filteredDoubledWms =
+        manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+    Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime();
+    Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime();
+
+    CommittedBundle<Integer> filteredBundle = globallyWindowedBundle(filtered, 2, 8);
+    TimerData pastTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(250L), TimeDomain.PROCESSING_TIME);
+    TimerData futureTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME);
+    TimerUpdate timers =
+        TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build();
+    manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), timers,
+        Collections.<CommittedBundle<?>>singleton(filteredBundle),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    Instant startTime = clock.now();
+    clock.set(startTime.plus(250L));
+    // We're held based on the past timer
+    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime)));
+    assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime)));
+    // And we're monotonic
+    assertThat(
+        filteredWms.getSynchronizedProcessingOutputTime(), not(earlierThan(initialFilteredWm)));
+    assertThat(
+        filteredDoubledWms.getSynchronizedProcessingOutputTime(),
+        not(earlierThan(initialFilteredDoubledWm)));
+
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firedTimers =
+        manager.extractFiredTimers();
+    assertThat(
+        firedTimers.get(filtered.getProducingTransformInternal())
+            .get("key")
+            .getTimers(TimeDomain.PROCESSING_TIME),
+        contains(pastTimer));
+    // Our timer has fired, but has not been completed, so it holds our synchronized processing WM
+    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime)));
+    assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime)));
+
+    CommittedBundle<Integer> filteredTimerBundle =
+        InProcessBundle.keyed(filtered, "key").commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    CommittedBundle<Integer> filteredTimerResult =
+        InProcessBundle.keyed(filteredTimesTwo, "key")
+            .commit(filteredWms.getSynchronizedProcessingOutputTime());
+    // Complete the processing time timer
+    manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(),
+        TimerUpdate.builder("key")
+            .withCompletedTimers(Collections.<TimerData>singleton(pastTimer))
+            .build(),
+        Collections.<CommittedBundle<?>>singleton(filteredTimerResult),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    clock.set(startTime.plus(500L));
+    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now())));
+    // filtered should be held to the time at which the filteredTimerResult fired
+    assertThat(
+        filteredDoubledWms.getSynchronizedProcessingOutputTime(),
+        not(earlierThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark())));
+
+    manager.updateWatermarks(filteredTimerResult, filteredTimesTwo.getProducingTransformInternal(),
+        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
+
+    clock.set(new Instant(Long.MAX_VALUE));
+    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), equalTo(new Instant(4096)));
+    assertThat(
+        filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(new Instant(4096)));
+  }
+
+  /**
+   * Demonstrates that if any earlier processing holds appear in the synchronized processing time
+   * output hold the result is monotonic.
+   */
+  @Test
+  public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
+    Instant startTime = clock.now();
+    TransformWatermarks watermarks =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(startTime));
+
+    TransformWatermarks filteredWatermarks =
+        manager.getWatermarks(filtered.getProducingTransformInternal());
+    // Non-root processing watermarks don't progress until data has been processed
+    assertThat(
+        filteredWatermarks.getSynchronizedProcessingInputTime(),
+        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+    assertThat(
+        filteredWatermarks.getSynchronizedProcessingOutputTime(),
+        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
+
+    CommittedBundle<Integer> createOutput =
+        InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L));
+
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE);
+    TransformWatermarks createAfterUpdate =
+        manager.getWatermarks(createdInts.getProducingTransformInternal());
+    assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
+    assertThat(
+        createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now())));
+
+    CommittedBundle<Integer> createSecondOutput =
+        InProcessBundle.unkeyed(createdInts).commit(new Instant(750L));
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(createSecondOutput),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
+  }
+
+  @Test
+  public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
+    CommittedBundle<Integer> created = globallyWindowedBundle(createdInts, 1, 2, 3);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(created), new Instant(40_900L));
+
+    CommittedBundle<Integer> filteredBundle = globallyWindowedBundle(filtered, 2, 4);
+    Instant upstreamHold = new Instant(2048L);
+    TimerData upstreamProcessingTimer =
+        TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME);
+    manager.updateWatermarks(created, filtered.getProducingTransformInternal(),
+        TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
+        Collections.<CommittedBundle<?>>singleton(filteredBundle),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    TransformWatermarks downstreamWms =
+        manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
+
+    clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
+
+    manager.extractFiredTimers();
+    // Pending processing time timers that have been fired but aren't completed hold the
+    // synchronized processing time
+    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
+
+    CommittedBundle<Integer> otherCreated = globallyWindowedBundle(createdInts, 4, 8, 12);
+    manager.updateWatermarks(otherCreated, filtered.getProducingTransformInternal(),
+        TimerUpdate.builder("key")
+            .withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
+            .build(),
+        Collections.<CommittedBundle<?>>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(earlierThan(clock.now())));
+  }
+
+  @Test
+  public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
+    CommittedBundle<Integer> created = globallyWindowedBundle(createdInts, 1, 2, 3);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(created), new Instant(29_919_235L));
+
+    Instant upstreamHold = new Instant(2048L);
+    CommittedBundle<Integer> filteredBundle =
+        InProcessBundle.keyed(filtered, "key").commit(upstreamHold);
+    manager.updateWatermarks(created, filtered.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>singleton(filteredBundle),
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    TransformWatermarks downstreamWms =
+        manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
+
+    clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold));
+  }
+
+  @Test
+  public void extractFiredTimersReturnsFiredEventTimeTimers() {
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+        manager.extractFiredTimers();
+    // Watermarks haven't advanced
+    assertThat(initialTimers.entrySet(), emptyIterable());
+
+    // Advance WM of keyed past the first timer, but ahead of the second and third
+    CommittedBundle<Integer> createdBundle = globallyWindowedBundle(filtered);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.singleton(createdBundle), new Instant(1500L));
+
+    TimerData earliestTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME);
+    TimerData middleTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.EVENT_TIME);
+    TimerData lastTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.EVENT_TIME);
+    Object key = new Object();
+    TimerUpdate update =
+        TimerUpdate.builder(key)
+            .setTimer(earliestTimer)
+            .setTimer(middleTimer)
+            .setTimer(lastTimer)
+            .build();
+
+    manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), update,
+        Collections.<CommittedBundle<?>>singleton(globallyWindowedBundle(intsToFlatten)),
+        new Instant(1000L));
+
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+        manager.extractFiredTimers();
+    assertThat(
+        firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
+    Map<Object, FiredTimers> firstFilteredTimers =
+        firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
+    assertThat(firstFilteredTimers.get(key), not(nullValue()));
+    FiredTimers firstFired = firstFilteredTimers.get(key);
+    assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer));
+
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+        manager.extractFiredTimers();
+    assertThat(
+        secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
+    Map<Object, FiredTimers> secondFilteredTimers =
+        secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
+    assertThat(secondFilteredTimers.get(key), not(nullValue()));
+    FiredTimers secondFired = secondFilteredTimers.get(key);
+    // Contains, in order, middleTimer and then lastTimer
+    assertThat(secondFired.getTimers(TimeDomain.EVENT_TIME), contains(middleTimer, lastTimer));
+  }
+
+  @Test
+  public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+        manager.extractFiredTimers();
+    // Watermarks haven't advanced
+    assertThat(initialTimers.entrySet(), emptyIterable());
+
+    // Advance WM of keyed past the first timer, but ahead of the second and third
+    CommittedBundle<Integer> createdBundle = globallyWindowedBundle(filtered);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.singleton(createdBundle), new Instant(1500L));
+
+    TimerData earliestTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME);
+    TimerData middleTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME);
+    TimerData lastTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME);
+    Object key = new Object();
+    TimerUpdate update =
+        TimerUpdate.builder(key)
+            .setTimer(lastTimer)
+            .setTimer(earliestTimer)
+            .setTimer(middleTimer)
+            .build();
+
+    manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), update,
+        Collections.<CommittedBundle<?>>singleton(globallyWindowedBundle(intsToFlatten)),
+        new Instant(1000L));
+
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+        manager.extractFiredTimers();
+    assertThat(
+        firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
+    Map<Object, FiredTimers> firstFilteredTimers =
+        firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
+    assertThat(firstFilteredTimers.get(key), not(nullValue()));
+    FiredTimers firstFired = firstFilteredTimers.get(key);
+    assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), contains(earliestTimer));
+
+    clock.set(new Instant(50_000L));
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+        manager.extractFiredTimers();
+    assertThat(
+        secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
+    Map<Object, FiredTimers> secondFilteredTimers =
+        secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
+    assertThat(secondFilteredTimers.get(key), not(nullValue()));
+    FiredTimers secondFired = secondFilteredTimers.get(key);
+    // Contains, in order, middleTimer and then lastTimer
+    assertThat(secondFired.getTimers(TimeDomain.PROCESSING_TIME), contains(middleTimer, lastTimer));
+  }
+
+  @Test
+  public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+        manager.extractFiredTimers();
+    // Watermarks haven't advanced
+    assertThat(initialTimers.entrySet(), emptyIterable());
+
+    // Advance WM of keyed past the first timer, but ahead of the second and third
+    CommittedBundle<Integer> createdBundle = globallyWindowedBundle(filtered);
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.singleton(createdBundle), new Instant(1500L));
+
+    TimerData earliestTimer = TimerData.of(
+        StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    TimerData middleTimer = TimerData.of(
+        StateNamespaces.global(), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    TimerData lastTimer = TimerData.of(
+        StateNamespaces.global(), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    Object key = new Object();
+    TimerUpdate update =
+        TimerUpdate.builder(key)
+            .setTimer(lastTimer)
+            .setTimer(earliestTimer)
+            .setTimer(middleTimer)
+            .build();
+
+    manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), update,
+        Collections.<CommittedBundle<?>>singleton(globallyWindowedBundle(intsToFlatten)),
+        new Instant(1000L));
+
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+        manager.extractFiredTimers();
+    assertThat(
+        firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
+    Map<Object, FiredTimers> firstFilteredTimers =
+        firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
+    assertThat(firstFilteredTimers.get(key), not(nullValue()));
+    FiredTimers firstFired = firstFilteredTimers.get(key);
+    assertThat(
+        firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), contains(earliestTimer));
+
+    clock.set(new Instant(50_000L));
+    manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
+        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+        manager.extractFiredTimers();
+    assertThat(
+        secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
+    Map<Object, FiredTimers> secondFilteredTimers =
+        secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
+    assertThat(secondFilteredTimers.get(key), not(nullValue()));
+    FiredTimers secondFired = secondFilteredTimers.get(key);
+    // Contains, in order, middleTimer and then lastTimer
+    assertThat(
+        secondFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME),
+        contains(middleTimer, lastTimer));
+  }
+
+  @Test
+  public void timerUpdateBuilderBuildAddsAllAddedTimers() {
+    TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME);
+    TimerData deleted =
+        TimerData.of(StateNamespaces.global(), new Instant(24L), TimeDomain.PROCESSING_TIME);
+    TimerData completedOne = TimerData.of(
+        StateNamespaces.global(), new Instant(1024L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    TimerData completedTwo =
+        TimerData.of(StateNamespaces.global(), new Instant(2048L), TimeDomain.EVENT_TIME);
+
+    TimerUpdate update =
+        TimerUpdate.builder("foo")
+            .withCompletedTimers(ImmutableList.of(completedOne, completedTwo))
+            .setTimer(set)
+            .deletedTimer(deleted)
+            .build();
+
+    assertThat(update.getCompletedTimers(), containsInAnyOrder(completedOne, completedTwo));
+    assertThat(update.getSetTimers(), contains(set));
+    assertThat(update.getDeletedTimers(), contains(deleted));
+  }
+
+  @Test
+  public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() {
+    TimerUpdateBuilder builder = TimerUpdate.builder(null);
+    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+
+    TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build();
+
+    assertThat(built.getSetTimers(), emptyIterable());
+    assertThat(built.getDeletedTimers(), contains(timer));
+  }
+
+  @Test
+  public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() {
+    TimerUpdateBuilder builder = TimerUpdate.builder(null);
+    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+
+    TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build();
+
+    assertThat(built.getSetTimers(), contains(timer));
+    assertThat(built.getDeletedTimers(), emptyIterable());
+  }
+
+  @Test
+  public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() {
+    TimerUpdateBuilder builder = TimerUpdate.builder(null);
+    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+
+    TimerUpdate built = builder.build();
+    builder.setTimer(timer);
+    assertThat(built.getSetTimers(), emptyIterable());
+    builder.build();
+    assertThat(built.getSetTimers(), emptyIterable());
+  }
+
+  @Test
+  public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() {
+    TimerUpdateBuilder builder = TimerUpdate.builder(null);
+    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+
+    TimerUpdate built = builder.build();
+    builder.deletedTimer(timer);
+    assertThat(built.getDeletedTimers(), emptyIterable());
+    builder.build();
+    assertThat(built.getDeletedTimers(), emptyIterable());
+  }
+
+  @Test
+  public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() {
+    TimerUpdateBuilder builder = TimerUpdate.builder(null);
+    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+
+    TimerUpdate built = builder.build();
+    builder.withCompletedTimers(ImmutableList.of(timer));
+    assertThat(built.getCompletedTimers(), emptyIterable());
+    builder.build();
+    assertThat(built.getCompletedTimers(), emptyIterable());
+  }
+
+  private static Matcher<Instant> earlierThan(final Instant laterInstant) {
+    return new BaseMatcher<Instant>() {
+      @Override
+      public boolean matches(Object item) {
+        ReadableInstant instant = (ReadableInstant) item;
+        return instant.isBefore(laterInstant);
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("earlier than ").appendValue(laterInstant);
+      }
+    };
+  }
+
+  private static Matcher<Instant> laterThan(final Instant shouldBeEarlier) {
+    return new BaseMatcher<Instant>() {
+      @Override
+      public boolean matches(Object item) {
+        ReadableInstant instant = (ReadableInstant) item;
+        return instant.isAfter(shouldBeEarlier);
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("later than ").appendValue(shouldBeEarlier);
+      }
+    };
+  }
+
+  @SafeVarargs
+  private final <T> CommittedBundle<T> timestampedBundle(
+      PCollection<T> pc, TimestampedValue<T>... values) {
+    UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc);
+    for (TimestampedValue<T> value : values) {
+      bundle.add(
+          WindowedValue.timestampedValueInGlobalWindow(value.getValue(), value.getTimestamp()));
+    }
+    return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  @SafeVarargs
+  private final <T> CommittedBundle<T> globallyWindowedBundle(PCollection<T> pc, T... values) {
+    UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc);
+    for (T value : values) {
+      bundle.add(WindowedValue.valueInGlobalWindow(value));
+    }
+    return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleTest.java
new file mode 100644
index 0000000..dcba86b
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleTest.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link InProcessBundle}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessBundleTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void unkeyedShouldCreateWithNullKey() {
+    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+
+    InProcessBundle<Integer> inFlightBundle = InProcessBundle.unkeyed(pcollection);
+
+    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
+
+    assertThat(bundle.isKeyed(), is(false));
+    assertThat(bundle.getKey(), nullValue());
+  }
+
+  private void keyedCreateBundle(Object key) {
+    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+
+    InProcessBundle<Integer> inFlightBundle = InProcessBundle.keyed(pcollection, key);
+
+    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
+    assertThat(bundle.isKeyed(), is(true));
+    assertThat(bundle.getKey(), equalTo(key));
+  }
+
+  @Test
+  public void keyedWithNullKeyShouldCreateKeyedBundle() {
+    keyedCreateBundle(null);
+  }
+
+  @Test
+  public void keyedWithKeyShouldCreateKeyedBundle() {
+    keyedCreateBundle(new Object());
+  }
+
+  private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) {
+    PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of());
+
+    InProcessBundle<T> bundle = InProcessBundle.unkeyed(pcollection);
+    Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>();
+    for (WindowedValue<T> elem : elems) {
+      bundle.add(elem);
+      expectations.add(equalTo(elem));
+    }
+    Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
+        Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
+    assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher);
+  }
+
+  @Test
+  public void getElementsBeforeAddShouldReturnEmptyIterable() {
+    afterCommitGetElementsShouldHaveAddedElements(Collections.<WindowedValue<Integer>>emptyList());
+  }
+
+  @Test
+  public void getElementsAfterAddShouldReturnAddedElements() {
+    WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1);
+    WindowedValue<Integer> secondValue =
+        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L));
+
+    afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue));
+  }
+
+  @Test
+  public void addAfterCommitShouldThrowException() {
+    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
+
+    InProcessBundle<Integer> bundle = InProcessBundle.unkeyed(pcollection);
+    bundle.add(WindowedValue.valueInGlobalWindow(1));
+    CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
+    assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("3");
+    thrown.expectMessage("committed");
+
+    bundle.add(WindowedValue.valueInGlobalWindow(3));
+  }
+
+  @Test
+  public void commitAfterCommitShouldThrowException() {
+    PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of());
+
+    InProcessBundle<Integer> bundle = InProcessBundle.unkeyed(pcollection);
+    bundle.add(WindowedValue.valueInGlobalWindow(1));
+    CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
+    assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("committed");
+
+    bundle.commit(Instant.now());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternalsTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternalsTest.java
new file mode 100644
index 0000000..435a5ba
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternalsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link InProcessTimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessTimerInternalsTest {
+  private MockClock clock;
+  @Mock private TransformWatermarks watermarks;
+
+  private TimerUpdateBuilder timerUpdateBuilder;
+
+  private InProcessTimerInternals internals;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    clock = MockClock.fromInstant(new Instant(0));
+
+    timerUpdateBuilder = TimerUpdate.builder(1234);
+
+    internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder);
+  }
+
+  @Test
+  public void setTimerAddsToBuilder() {
+    TimerData eventTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
+    TimerData processingTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTimer =
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(98745632189L),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    internals.setTimer(eventTimer);
+    internals.setTimer(processingTimer);
+    internals.setTimer(synchronizedProcessingTimer);
+
+    assertThat(
+        internals.getTimerUpdate().getSetTimers(),
+        containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer));
+  }
+
+  @Test
+  public void deleteTimerDeletesOnBuilder() {
+    TimerData eventTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
+    TimerData processingTimer =
+        TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTimer =
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(98745632189L),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    internals.deleteTimer(eventTimer);
+    internals.deleteTimer(processingTimer);
+    internals.deleteTimer(synchronizedProcessingTimer);
+
+    assertThat(
+        internals.getTimerUpdate().getDeletedTimers(),
+        containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer));
+  }
+
+  @Test
+  public void getProcessingTimeIsClockNow() {
+    assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
+    Instant oldProcessingTime = internals.currentProcessingTime();
+
+    clock.advance(Duration.standardHours(12));
+
+    assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
+    assertThat(
+        internals.currentProcessingTime(),
+        equalTo(oldProcessingTime.plus(Duration.standardHours(12))));
+  }
+
+  @Test
+  public void getSynchronizedProcessingTimeIsWatermarkSynchronizedInputTime() {
+    when(watermarks.getSynchronizedProcessingInputTime()).thenReturn(new Instant(12345L));
+    assertThat(internals.currentSynchronizedProcessingTime(), equalTo(new Instant(12345L)));
+  }
+
+  @Test
+  public void getInputWatermarkTimeUsesWatermarkTime() {
+    when(watermarks.getInputWatermark()).thenReturn(new Instant(8765L));
+    assertThat(internals.currentInputWatermarkTime(), equalTo(new Instant(8765L)));
+  }
+
+  @Test
+  public void getOutputWatermarkTimeUsesWatermarkTime() {
+    when(watermarks.getOutputWatermark()).thenReturn(new Instant(25525L));
+    assertThat(internals.currentOutputWatermarkTime(), equalTo(new Instant(25525L)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/MockClock.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/MockClock.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/MockClock.java
new file mode 100644
index 0000000..d69660b
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/MockClock.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A clock that returns a constant value for now which can be set with calls to
+ * {@link #set(Instant)}.
+ *
+ * <p>For uses of the {@link Clock} interface in unit tests.
+ */
+public class MockClock implements Clock {
+
+  private Instant now;
+
+  public static MockClock fromInstant(Instant initial) {
+    return new MockClock(initial);
+  }
+
+  private MockClock(Instant initialNow) {
+    this.now = initialNow;
+  }
+
+  public void set(Instant newNow) {
+    checkArgument(!newNow.isBefore(now), "Cannot move MockClock backwards in time from %s to %s",
+        now, newNow);
+    this.now = newNow;
+  }
+
+  public void advance(Duration duration) {
+    checkArgument(
+        duration.getMillis() > 0,
+        "Cannot move MockClock backwards in time by duration %s",
+        duration);
+    set(now.plus(duration));
+  }
+
+  @Override
+  public Instant now() {
+    return now;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
index 80863b9..033f9de 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
@@ -24,11 +24,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate;
-import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
index 919e69e..ae599ba 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
@@ -24,11 +24,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate;
-import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
index 8640056..f139c56 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
@@ -27,7 +27,6 @@ import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
index 021709b..2f5cd0f 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
@@ -27,7 +27,6 @@ import com.google.cloud.dataflow.sdk.coders.VoidCoder;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
-import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;


Mime
View raw message