beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [24/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
Date Thu, 20 Jul 2017 17:09:51 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 1c6cd30..b526305 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -17,88 +17,545 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.GroupingState;
 import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.SetState;
-import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.hamcrest.Matchers;
+import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.junit.runners.Suite;
 
 /**
- * Tests for {@link InMemoryStateInternals}. This is based on {@link StateInternalsTest}.
+ * Tests for {@link InMemoryStateInternals}.
  */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
-    InMemoryStateInternalsTest.StandardStateInternalsTests.class,
-    InMemoryStateInternalsTest.OtherTests.class
-})
+@RunWith(JUnit4.class)
 public class InMemoryStateInternalsTest {
+  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
+  private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+  private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
+  private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
 
-  /**
-   * A standard StateInternals test.
-   */
-  @RunWith(JUnit4.class)
-  public static class StandardStateInternalsTests extends StateInternalsTest {
-    @Override
-    protected StateInternals createStateInternals() {
-      return new InMemoryStateInternals<>("dummyKey");
-    }
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
+      StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
+      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
+      StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<SetState<String>> STRING_SET_ADDR =
+      StateTags.set("stringSet", StringUtf8Coder.of());
+  private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
+      StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
+  private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+
+  InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
+
+  @Test
+  public void testValue() throws Exception {
+    ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value));
+    assertThat(
+        underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
+        Matchers.not(Matchers.sameInstance(value)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.write("hello");
+    assertThat(value.read(), equalTo("hello"));
+    value.write("world");
+    assertThat(value.read(), equalTo("world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.nullValue());
+    assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testBag() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR)));
+    assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))));
+
+    assertThat(value.read(), Matchers.emptyIterable());
+    value.add("hello");
+    assertThat(value.read(), containsInAnyOrder("hello"));
+
+    value.add("world");
+    assertThat(value.read(), containsInAnyOrder("hello", "world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testBagIsEmpty() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add("hello");
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeBagIntoSource() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testMergeBagIntoNewNamespace() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag1.read(), Matchers.emptyIterable());
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testSet() throws Exception {
+    SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR)));
+    assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR))));
+
+    // empty
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertFalse(value.contains("A").read());
+
+    // add
+    value.add("A");
+    value.add("B");
+    value.add("A");
+    assertFalse(value.addIfAbsent("B").read());
+    assertThat(value.read(), containsInAnyOrder("A", "B"));
+
+    // remove
+    value.remove("A");
+    assertThat(value.read(), containsInAnyOrder("B"));
+    value.remove("C");
+    assertThat(value.read(), containsInAnyOrder("B"));
+
+    // contains
+    assertFalse(value.contains("A").read());
+    assertTrue(value.contains("B").read());
+    value.add("C");
+    value.add("D");
+
+    // readLater
+    assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D"));
+    SetState<String> later = value.readLater();
+    assertThat(later.read(), hasItems("C", "D"));
+    assertFalse(later.contains("A").read());
+
+    // clear
+    value.clear();
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), Matchers.sameInstance(value));
+
+  }
+
+  @Test
+  public void testSetIsEmpty() throws Exception {
+    SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add("hello");
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeSetIntoSource() throws Exception {
+    SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+    SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
+
+    set1.add("Hello");
+    set2.add("Hello");
+    set2.add("World");
+    set1.add("!");
+
+    StateMerging.mergeSets(Arrays.asList(set1, set2), set1);
+
+    // Reading the merged set gets both the contents
+    assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!"));
+    assertThat(set2.read(), Matchers.emptyIterable());
   }
 
-  /**
-   * A specific test of InMemoryStateInternals.
-   */
-  @RunWith(JUnit4.class)
-  public static class OtherTests {
-
-    private static final StateNamespace NAMESPACE = new StateNamespaceForTest("ns");
-
-    private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
-        StateTags.value("stringValue", StringUtf8Coder.of());
-    private static final StateTag<CombiningState<Integer, int[], Integer>>
-        SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-        "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-    private static final StateTag<BagState<String>> STRING_BAG_ADDR =
-        StateTags.bag("stringBag", StringUtf8Coder.of());
-    private static final StateTag<SetState<String>> STRING_SET_ADDR =
-        StateTags.set("stringSet", StringUtf8Coder.of());
-    private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
-        StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
-    private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
-        StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-    private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
-        StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-    private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
-        StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
-
-    StateInternals underTest = new InMemoryStateInternals<>("dummyKey");
-
-    @Test
-    public void testSameInstance() {
-      assertSameInstance(STRING_VALUE_ADDR);
-      assertSameInstance(SUM_INTEGER_ADDR);
-      assertSameInstance(STRING_BAG_ADDR);
-      assertSameInstance(STRING_SET_ADDR);
-      assertSameInstance(STRING_MAP_ADDR);
-      assertSameInstance(WATERMARK_EARLIEST_ADDR);
+  @Test
+  public void testMergeSetIntoNewNamespace() throws Exception {
+    SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+    SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
+    SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR);
+
+    set1.add("Hello");
+    set2.add("Hello");
+    set2.add("World");
+    set1.add("!");
+
+    StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3);
+
+    // Reading the merged set gets both the contents
+    assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!"));
+    assertThat(set1.read(), Matchers.emptyIterable());
+    assertThat(set2.read(), Matchers.emptyIterable());
+  }
+
+  // for testMap
+  private static class MapEntry<K, V> implements Map.Entry<K, V> {
+    private K key;
+    private V value;
+
+    private MapEntry(K key, V value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    static <K, V> Map.Entry<K, V> of(K k, V v) {
+      return new MapEntry<>(k, v);
+    }
+
+    public final K getKey() {
+      return key;
+    }
+    public final V getValue() {
+      return value;
+    }
+
+    public final String toString() {
+      return key + "=" + value;
+    }
+
+    public final int hashCode() {
+      return Objects.hashCode(key) ^ Objects.hashCode(value);
+    }
+
+    public final V setValue(V newValue) {
+      V oldValue = value;
+      value = newValue;
+      return oldValue;
     }
 
-    private <T extends State> void assertSameInstance(StateTag<T> address) {
-      assertThat(underTest.state(NAMESPACE, address),
-          Matchers.sameInstance(underTest.state(NAMESPACE, address)));
+    public final boolean equals(Object o) {
+      if (o == this) {
+        return true;
+      }
+      if (o instanceof Map.Entry) {
+        Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
+        if (Objects.equals(key, e.getKey())
+            && Objects.equals(value, e.getValue())) {
+          return true;
+        }
+      }
+      return false;
     }
   }
 
+  @Test
+  public void testMap() throws Exception {
+    MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR)));
+    assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR))));
+
+    // put
+    assertThat(value.entries().read(), Matchers.emptyIterable());
+    value.put("A", 1);
+    value.put("B", 2);
+    value.put("A", 11);
+    assertThat(value.putIfAbsent("B", 22).read(), equalTo(2));
+    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11),
+        MapEntry.of("B", 2)));
+
+    // remove
+    value.remove("A");
+    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
+    value.remove("C");
+    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
+
+    // get
+    assertNull(value.get("A").read());
+    assertThat(value.get("B").read(), equalTo(2));
+    value.put("C", 3);
+    value.put("D", 4);
+    assertThat(value.get("C").read(), equalTo(3));
+
+    // iterate
+    value.put("E", 5);
+    value.remove("C");
+    assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E"));
+    assertThat(value.values().read(), containsInAnyOrder(2, 4, 5));
+    assertThat(
+        value.entries().read(),
+        containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
+
+    // readLater
+    assertThat(value.get("B").readLater().read(), equalTo(2));
+    assertNull(value.get("A").readLater().read());
+    assertThat(
+        value.entries().readLater().read(),
+        containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
+
+    // clear
+    value.clear();
+    assertThat(value.entries().read(), Matchers.emptyIterable());
+    assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testCombiningValue() throws Exception {
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+    assertThat(value.read(), equalTo(0));
+    value.add(2);
+    assertThat(value.read(), equalTo(2));
+
+    value.add(3);
+    assertThat(value.read(), equalTo(5));
+
+    value.clear();
+    assertThat(value.read(), equalTo(0));
+    assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testCombiningIsEmpty() throws Exception {
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(5);
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoSource() throws Exception {
+    CombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    assertThat(value1.read(), equalTo(11));
+    assertThat(value2.read(), equalTo(10));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+    assertThat(value1.read(), equalTo(21));
+    assertThat(value2.read(), equalTo(0));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+    CombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value3 =
+        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value1.read(), equalTo(0));
+    assertThat(value2.read(), equalTo(0));
+    assertThat(value3.read(), equalTo(21));
+  }
+
+  @Test
+  public void testWatermarkEarliestState() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), equalTo(new Instant(2000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), equalTo(new Instant(1000)));
+
+    value.clear();
+    assertThat(value.read(), equalTo(null));
+    assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testWatermarkLatestState() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), equalTo(new Instant(3000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), equalTo(new Instant(3000)));
+
+    value.clear();
+    assertThat(value.read(), equalTo(null));
+    assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testWatermarkEndOfWindowState() throws Exception {
+    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), equalTo(new Instant(2000)));
+
+    value.clear();
+    assertThat(value.read(), equalTo(null));
+    assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), Matchers.sameInstance(value));
+  }
+
+  @Test
+  public void testWatermarkStateIsEmpty() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(new Instant(1000));
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeEarliestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+    WatermarkHoldState value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the merged value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
+
+    assertThat(value1.read(), equalTo(new Instant(2000)));
+    assertThat(value2.read(), equalTo(null));
+  }
+
+  @Test
+  public void testMergeLatestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState value3 =
+        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value3.read(), equalTo(new Instant(5000)));
+    assertThat(value1.read(), equalTo(null));
+    assertThat(value2.read(), equalTo(null));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index 959909e..a2f6acc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -17,22 +17,18 @@
  */
 package org.apache.beam.runners.core;
 
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.concurrent.Executors;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -46,27 +42,19 @@ import org.junit.Test;
 /** Tests for {@link OutputAndTimeBoundedSplittableProcessElementInvoker}. */
 public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   private static class SomeFn extends DoFn<Integer, String> {
-    private final int numOutputsPerProcessCall;
     private final Duration sleepBeforeEachOutput;
 
-    private SomeFn(int numOutputsPerProcessCall, Duration sleepBeforeEachOutput) {
-      this.numOutputsPerProcessCall = numOutputsPerProcessCall;
+    private SomeFn(Duration sleepBeforeEachOutput) {
       this.sleepBeforeEachOutput = sleepBeforeEachOutput;
     }
 
     @ProcessElement
-    public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker)
+    public void process(ProcessContext context, OffsetRangeTracker tracker)
         throws Exception {
-      for (long i = tracker.currentRestriction().getFrom(), numIterations = 1;
-          tracker.tryClaim(i);
-          ++i, ++numIterations) {
+      for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
         Thread.sleep(sleepBeforeEachOutput.getMillis());
         context.output("" + i);
-        if (numIterations == numOutputsPerProcessCall) {
-          return resume();
-        }
       }
-      return stop();
     }
 
     @GetInitialRestriction
@@ -76,8 +64,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   }
 
   private SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result
-      runTest(int totalNumOutputs, int numOutputsPerProcessCall, Duration sleepPerElement) {
-    SomeFn fn = new SomeFn(numOutputsPerProcessCall, sleepPerElement);
+      runTest(int count, Duration sleepPerElement) {
+    SomeFn fn = new SomeFn(sleepPerElement);
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker> invoker =
         new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
             fn,
@@ -105,15 +93,14 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
 
     return invoker.invokeProcessElement(
         DoFnInvokers.invokerFor(fn),
-        WindowedValue.of(totalNumOutputs, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
-        new OffsetRangeTracker(new OffsetRange(0, totalNumOutputs)));
+        WindowedValue.of(count, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+        new OffsetRangeTracker(new OffsetRange(0, count)));
   }
 
   @Test
   public void testInvokeProcessElementOutputBounded() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
-        runTest(10000, Integer.MAX_VALUE, Duration.ZERO);
-    assertFalse(res.getContinuation().shouldResume());
+        runTest(10000, Duration.ZERO);
     OffsetRange residualRange = res.getResidualRestriction();
     // Should process the first 100 elements.
     assertEquals(1000, residualRange.getFrom());
@@ -123,8 +110,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   @Test
   public void testInvokeProcessElementTimeBounded() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
-        runTest(10000, Integer.MAX_VALUE, Duration.millis(100));
-    assertFalse(res.getContinuation().shouldResume());
+        runTest(10000, Duration.millis(100));
     OffsetRange residualRange = res.getResidualRestriction();
     // Should process ideally around 30 elements - but due to timing flakiness, we can't enforce
     // that precisely. Just test that it's not egregiously off.
@@ -134,18 +120,9 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   }
 
   @Test
-  public void testInvokeProcessElementVoluntaryReturnStop() throws Exception {
+  public void testInvokeProcessElementVoluntaryReturn() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
-        runTest(5, Integer.MAX_VALUE, Duration.millis(100));
-    assertFalse(res.getContinuation().shouldResume());
+        runTest(5, Duration.millis(100));
     assertNull(res.getResidualRestriction());
   }
-
-  @Test
-  public void testInvokeProcessElementVoluntaryReturnResume() throws Exception {
-    SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
-        runTest(10, 5, Duration.millis(100));
-    assertTrue(res.getContinuation().shouldResume());
-    assertEquals(new OffsetRange(5, 10), res.getResidualRestriction());
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4f13af1..9e71300 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -68,7 +67,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -142,40 +140,7 @@ public class ReduceFnRunnerTest {
       }
     })
     .when(mockTrigger).onFire(anyTriggerContext());
-  }
-
-  /**
-   * Tests that a processing time timer does not cause window GC.
-   */
-  @Test
-  public void testProcessingTimeTimerDoesNotGc() throws Exception {
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.ZERO)
-            .withTrigger(
-                Repeatedly.forever(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    tester.advanceProcessingTime(new Instant(5000));
-    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
-    injectElement(tester, 5);
-
-    tester.advanceProcessingTime(new Instant(10000));
-
-    tester.assertHasOnlyGlobalAndStateFor(
-        new IntervalWindow(new Instant(0), new Instant(100)));
-
-    assertThat(
-        tester.extractOutput(),
-        contains(
-            isSingleWindowedValue(
-                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, Timing.EARLY, 0, 0))));
-  }
+ }
 
   @Test
   public void testOnElementBufferingDiscarding() throws Exception {
@@ -246,52 +211,6 @@ public class ReduceFnRunnerTest {
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
   }
 
-  /**
-   * Tests that with the default trigger we will not produce two ON_TIME panes, even
-   * if there are two outputs that are both candidates.
-   */
-  @Test
-  public void testOnlyOneOnTimePane() throws Exception {
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withTrigger(DefaultTrigger.of())
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.millis(100));
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    tester.advanceInputWatermark(new Instant(0));
-
-    int value1 = 1;
-    int value2 = 3;
-
-    // A single element that should be in the ON_TIME output
-    tester.injectElements(
-        TimestampedValue.of(value1, new Instant(1)));
-
-    // Should fire ON_TIME
-    tester.advanceInputWatermark(new Instant(10));
-
-    // The DefaultTrigger should cause output labeled LATE, even though it does not have to be
-    // labeled as such.
-    tester.injectElements(
-        TimestampedValue.of(value2, new Instant(3)));
-
-    List<WindowedValue<Integer>> output = tester.extractOutput();
-    assertEquals(2, output.size());
-
-    assertThat(output.get(0), WindowMatchers.isWindowedValue(equalTo(value1)));
-    assertThat(output.get(1), WindowMatchers.isWindowedValue(equalTo(value1 + value2)));
-
-    assertThat(
-        output.get(0),
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
-    assertThat(
-        output.get(1),
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 1)));
-  }
-
   @Test
   public void testOnElementCombiningDiscarding() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and discarding mode.
@@ -331,76 +250,6 @@ public class ReduceFnRunnerTest {
   }
 
   /**
-   * Tests that when a processing time timer comes in after a window is expired
-   * it is just ignored.
-   */
-  @Test
-  public void testLateProcessingTimeTimer() throws Exception {
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.ZERO)
-            .withTrigger(
-                Repeatedly.forever(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    tester.advanceProcessingTime(new Instant(5000));
-    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
-    injectElement(tester, 5);
-
-    // After this advancement, the window is expired and only the GC process
-    // should be allowed to touch it
-    tester.advanceInputWatermarkNoTimers(new Instant(100));
-
-    // This should not output
-    tester.advanceProcessingTime(new Instant(6000));
-
-    assertThat(tester.extractOutput(), emptyIterable());
-  }
-
-  /**
-   * Tests that when a processing time timer comes in after a window is expired
-   * but in the same bundle it does not cause a spurious output.
-   */
-  @Test
-  public void testCombiningAccumulatingProcessingTime() throws Exception {
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.ZERO)
-            .withTrigger(
-                Repeatedly.forever(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    tester.advanceProcessingTime(new Instant(5000));
-    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
-    injectElement(tester, 5);
-
-    tester.advanceInputWatermarkNoTimers(new Instant(100));
-    tester.advanceProcessingTimeNoTimers(new Instant(5010));
-
-    // Fires the GC/EOW timer at the same time as the processing time timer.
-    tester.fireTimers(
-        new IntervalWindow(new Instant(0), new Instant(100)),
-        TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)),
-        TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010)));
-
-    assertThat(
-        tester.extractOutput(),
-        contains(
-            isSingleWindowedValue(
-                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
-  }
-
-  /**
    * Tests that the garbage collection time for a fixed window does not overflow the end of time.
    */
   @Test
@@ -467,67 +316,6 @@ public class ReduceFnRunnerTest {
     assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
   }
 
-  /**
-   * Tests that when a processing time timers comes in after a window is expired
-   * and GC'd it does not cause a spurious output.
-   */
-  @Test
-  public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws Exception {
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.ZERO)
-            .withTrigger(
-                Repeatedly.forever(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    tester.advanceProcessingTime(new Instant(5000));
-    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
-    injectElement(tester, 5);
-
-    tester.advanceInputWatermark(new Instant(100));
-    tester.advanceProcessingTime(new Instant(5011));
-
-    assertThat(
-        tester.extractOutput(),
-        contains(
-            isSingleWindowedValue(
-                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
-  }
-
-  /**
-   * Tests that if end-of-window and GC timers come in together, that the pane is correctly
-   * marked as final.
-   */
-  @Test
-  public void testCombiningAccumulatingEventTime() throws Exception {
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.millis(1))
-            .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()));
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
-    injectElement(tester, 5);
-
-    tester.advanceInputWatermark(new Instant(1000));
-
-    assertThat(
-        tester.extractOutput(),
-        contains(
-            isSingleWindowedValue(
-                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))));
-  }
-
-
   @Test
   public void testOnElementCombiningAccumulating() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and accumulating mode.
@@ -1501,166 +1289,6 @@ public class ReduceFnRunnerTest {
   }
 
   /**
-   * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY.
-   */
-  @Test
-  public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withTrigger(
-                AfterEach.<IntervalWindow>inOrder(
-                    Repeatedly.forever(
-                        AfterProcessingTime.pastFirstElementInPane()
-                            .plusDelayOf(new Duration(5)))
-                        .orFinally(AfterWatermark.pastEndOfWindow()),
-                    Repeatedly.forever(
-                        AfterProcessingTime.pastFirstElementInPane()
-                            .plusDelayOf(new Duration(25)))))
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.millis(100))
-            .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
-            .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY);
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    tester.advanceInputWatermark(new Instant(0));
-    tester.advanceProcessingTime(new Instant(0));
-
-    // Processing time timer for 5
-    tester.injectElements(
-        TimestampedValue.of(1, new Instant(1)),
-        TimestampedValue.of(1, new Instant(3)),
-        TimestampedValue.of(1, new Instant(7)),
-        TimestampedValue.of(1, new Instant(5)));
-
-    // Should fire early pane
-    tester.advanceProcessingTime(new Instant(6));
-
-    // Should not fire empty on time pane
-    tester.advanceInputWatermark(new Instant(11));
-
-    // Should fire final GC pane
-    tester.advanceInputWatermark(new Instant(10 + 100));
-    List<WindowedValue<Integer>> output = tester.extractOutput();
-    assertEquals(2, output.size());
-
-    assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
-    assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));
-
-    assertThat(
-        output.get(0),
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
-    assertThat(
-        output.get(1),
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 1, 0)));
-  }
-
-  /**
-   * Test that it fires an empty on-time isFinished pane when OnTimeBehavior is FIRE_ALWAYS
-   * and ClosingBehavior is FIRE_IF_NON_EMPTY.
-   *
-   * <p>This is a test just for backward compatibility.
-   */
-  @Test
-  public void testEmptyOnTimeWithOnTimeBehaviorBackwardCompatibility() throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withTrigger(AfterWatermark.pastEndOfWindow()
-                .withEarlyFirings(AfterPane.elementCountAtLeast(1)))
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.millis(0))
-            .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY);
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    tester.advanceInputWatermark(new Instant(0));
-    tester.advanceProcessingTime(new Instant(0));
-
-    tester.injectElements(
-        TimestampedValue.of(1, new Instant(1)));
-
-    // Should fire empty on time isFinished pane
-    tester.advanceInputWatermark(new Instant(11));
-
-    List<WindowedValue<Integer>> output = tester.extractOutput();
-    assertEquals(2, output.size());
-
-    assertThat(
-        output.get(0),
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
-    assertThat(
-        output.get(1),
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0)));
-  }
-
-  /**
-   * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY
-   * and when receiving late data.
-   */
-  @Test
-  public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmptyAndLateData() throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> strategy =
-        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withTrigger(
-                AfterEach.<IntervalWindow>inOrder(
-                    Repeatedly.forever(
-                        AfterProcessingTime.pastFirstElementInPane()
-                            .plusDelayOf(new Duration(5)))
-                        .orFinally(AfterWatermark.pastEndOfWindow()),
-                    Repeatedly.forever(
-                        AfterProcessingTime.pastFirstElementInPane()
-                            .plusDelayOf(new Duration(25)))))
-            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-            .withAllowedLateness(Duration.millis(100))
-            .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY);
-
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
-
-    tester.advanceInputWatermark(new Instant(0));
-    tester.advanceProcessingTime(new Instant(0));
-
-    // Processing time timer for 5
-    tester.injectElements(
-        TimestampedValue.of(1, new Instant(1)),
-        TimestampedValue.of(1, new Instant(3)),
-        TimestampedValue.of(1, new Instant(7)),
-        TimestampedValue.of(1, new Instant(5)));
-
-    // Should fire early pane
-    tester.advanceProcessingTime(new Instant(6));
-
-    // Should not fire empty on time pane
-    tester.advanceInputWatermark(new Instant(11));
-
-    // Processing late data, and should fire late pane
-    tester.injectElements(
-        TimestampedValue.of(1, new Instant(9)));
-    tester.advanceProcessingTime(new Instant(6 + 25 + 1));
-
-    List<WindowedValue<Integer>> output = tester.extractOutput();
-    assertEquals(2, output.size());
-
-    assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
-    assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(5, 9, 0, 10));
-
-    assertThat(
-        output.get(0),
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
-    assertThat(
-        output.get(1),
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 0)));
-  }
-
-  /**
    * Tests for processing time firings after the watermark passes the end of the window.
    * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
    * when the on-time pane is non-empty.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 7ca96b9..7f83eae 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -318,19 +318,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   }
 
   @SafeVarargs
-  public final void assertHasOnlyGlobalAndStateFor(W... expectedWindows) {
-    assertHasOnlyGlobalAndAllowedTags(
-        ImmutableSet.copyOf(expectedWindows),
-        ImmutableSet.<StateTag<?>>of(
-            ((SystemReduceFn<?, ?, ?, ?, ?>) reduceFn).getBufferTag(),
-            TriggerStateMachineRunner.FINISHED_BITS_TAG,
-            PaneInfoTracker.PANE_INFO_TAG,
-            WatermarkHold.watermarkHoldTagForTimestampCombiner(
-                objectStrategy.getTimestampCombiner()),
-            WatermarkHold.EXTRA_HOLD_TAG));
-  }
-
-  @SafeVarargs
   public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
@@ -433,10 +420,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     return result;
   }
 
-  public void advanceInputWatermarkNoTimers(Instant newInputWatermark) throws Exception {
-    timerInternals.advanceInputWatermark(newInputWatermark);
-  }
-
   /**
    * Advance the input watermark to the specified time, firing any timers that should
    * fire. Then advance the output watermark as far as possible.
@@ -468,10 +451,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     runner.persist();
   }
 
-  public void advanceProcessingTimeNoTimers(Instant newProcessingTime) throws Exception {
-    timerInternals.advanceProcessingTime(newProcessingTime);
-  }
-
   /**
    * If {@link #autoAdvanceOutputWatermark} is {@literal false}, advance the output watermark
    * to the given value. Otherwise throw.
@@ -529,8 +508,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     for (TimestampedValue<InputT> value : values) {
       WindowTracing.trace("TriggerTester.injectElements: {}", value);
     }
-
-    Iterable<WindowedValue<InputT>> inputs =
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    runner.processElements(
         Iterables.transform(
             Arrays.asList(values),
             new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() {
@@ -548,12 +527,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
                   throw new RuntimeException(e);
                 }
               }
-            });
-
-    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    runner.processElements(
-        new LateDataDroppingDoFnRunner.LateDataFilter(objectStrategy, timerInternals)
-            .filter(KEY, inputs));
+            }));
 
     // Persist after each bundle.
     runner.persist();
@@ -561,27 +535,13 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
   public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    ArrayList<TimerData> timers = new ArrayList<>(1);
+    ArrayList timers = new ArrayList(1);
     timers.add(
         TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
     runner.onTimers(timers);
     runner.persist();
   }
 
-  public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws Exception {
-    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    ArrayList<TimerData> timerData = new ArrayList<>(timers.length);
-    for (TimestampedValue<TimeDomain> timer : timers) {
-      timerData.add(
-          TimerData.of(
-              StateNamespaces.window(windowFn.windowCoder(), window),
-              timer.getTimestamp(),
-              timer.getValue()));
-    }
-    runner.onTimers(timerData);
-    runner.persist();
-  }
-
   /**
    * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output
    * elements.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 7449af3..d242431 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasItems;
@@ -38,15 +35,16 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Executors;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
+import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -55,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -114,7 +111,9 @@ public class SplittableParDoProcessFnTest {
   private static class ProcessFnTester<
           InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
       implements AutoCloseable {
-    private final DoFnTester<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> tester;
+    private final DoFnTester<
+            KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+        tester;
     private Instant currentProcessingTime;
 
     private InMemoryTimerInternals timerInternals;
@@ -195,13 +194,14 @@ public class SplittableParDoProcessFnTest {
     void startElement(InputT element, RestrictionT restriction) throws Exception {
       startElement(
           WindowedValue.of(
-              KV.of(element, restriction),
+              ElementAndRestriction.of(element, restriction),
               currentProcessingTime,
               GlobalWindow.INSTANCE,
               PaneInfo.ON_TIME_AND_ONLY_FIRING));
     }
 
-    void startElement(WindowedValue<KV<InputT, RestrictionT>> windowedValue) throws Exception {
+    void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
+        throws Exception {
       tester.processElement(
           KeyedWorkItems.elementsWorkItem("key", Collections.singletonList(windowedValue)));
     }
@@ -223,7 +223,8 @@ public class SplittableParDoProcessFnTest {
         return false;
       }
       tester.processElement(
-          KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem("key", timers));
+          KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
+              "key", timers));
       return true;
     }
 
@@ -308,7 +309,7 @@ public class SplittableParDoProcessFnTest {
             MAX_BUNDLE_DURATION);
     tester.startElement(
         WindowedValue.of(
-            KV.of(42, new SomeRestriction()),
+            ElementAndRestriction.of(42, new SomeRestriction()),
             base,
             Collections.singletonList(w),
             PaneInfo.ON_TIME_AND_ONLY_FIRING));
@@ -368,71 +369,16 @@ public class SplittableParDoProcessFnTest {
     assertEquals(null, tester.getWatermarkHold());
   }
 
-  /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */
-  private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
-    @ProcessElement
-    public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) {
-      c.output(c.element().toString());
-      return resume().withResumeDelay(Duration.standardSeconds(5));
-    }
-
-    @GetInitialRestriction
-    public SomeRestriction getInitialRestriction(Integer elem) {
-      return new SomeRestriction();
-    }
-  }
-
-  @Test
-  public void testResumeSetsTimer() throws Exception {
-    DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
-    Instant base = Instant.now();
-    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
-        new ProcessFnTester<>(
-            base,
-            fn,
-            BigEndianIntegerCoder.of(),
-            SerializableCoder.of(SomeRestriction.class),
-            MAX_OUTPUTS_PER_BUNDLE,
-            MAX_BUNDLE_DURATION);
-
-    tester.startElement(42, new SomeRestriction());
-    assertThat(tester.takeOutputElements(), contains("42"));
-
-    // Should resume after 5 seconds: advancing by 3 seconds should have no effect.
-    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
-    assertTrue(tester.takeOutputElements().isEmpty());
-
-    // 6 seconds should be enough  should invoke the fn again.
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
-    assertThat(tester.takeOutputElements(), contains("42"));
-
-    // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
-    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
-    assertTrue(tester.takeOutputElements().isEmpty());
-
-    // 6 seconds should again be enough.
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
-    assertThat(tester.takeOutputElements(), contains("42"));
-  }
-
-  /** A splittable {@link DoFn} that generates the sequence [init, init + total). */
+  /**
+   * A splittable {@link DoFn} that generates the sequence [init, init + total).
+   */
   private static class CounterFn extends DoFn<Integer, String> {
-    private final int numOutputsPerCall;
-
-    public CounterFn(int numOutputsPerCall) {
-      this.numOutputsPerCall = numOutputsPerCall;
-    }
-
     @ProcessElement
-    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
-      for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
-          tracker.tryClaim(i); ++i, ++numIterations) {
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (long i = tracker.currentRestriction().getFrom();
+          tracker.tryClaim(i); ++i) {
         c.output(String.valueOf(c.element() + i));
-        if (numIterations == numOutputsPerCall) {
-          return resume();
-        }
       }
-      return stop();
     }
 
     @GetInitialRestriction
@@ -441,35 +387,10 @@ public class SplittableParDoProcessFnTest {
     }
   }
 
-  public void testResumeCarriesOverState() throws Exception {
-    DoFn<Integer, String> fn = new CounterFn(1);
-    Instant base = Instant.now();
-    ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester =
-        new ProcessFnTester<>(
-            base,
-            fn,
-            BigEndianIntegerCoder.of(),
-            SerializableCoder.of(OffsetRange.class),
-            MAX_OUTPUTS_PER_BUNDLE,
-            MAX_BUNDLE_DURATION);
-
-    tester.startElement(42, new OffsetRange(0, 3));
-    assertThat(tester.takeOutputElements(), contains("42"));
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
-    assertThat(tester.takeOutputElements(), contains("43"));
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
-    assertThat(tester.takeOutputElements(), contains("44"));
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
-    // After outputting all 3 items, should not output anything more.
-    assertEquals(0, tester.takeOutputElements().size());
-    // Should also not ask to resume.
-    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
-  }
-
   @Test
   public void testCheckpointsAfterNumOutputs() throws Exception {
     int max = 100;
-    DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
+    DoFn<Integer, String> fn = new CounterFn();
     Instant base = Instant.now();
     int baseIndex = 42;
 
@@ -511,7 +432,7 @@ public class SplittableParDoProcessFnTest {
     // But bound bundle duration - the bundle should terminate.
     Duration maxBundleDuration = Duration.standardSeconds(1);
     // Create an fn that attempts to 2x output more than checkpointing allows.
-    DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
+    DoFn<Integer, String> fn = new CounterFn();
     Instant base = Instant.now();
     int baseIndex = 42;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
deleted file mode 100644
index ae07fe6..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
+++ /dev/null
@@ -1,613 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItems;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Iterables;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.SetState;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for {@link StateInternals}.
- */
-public abstract class StateInternalsTest {
-
-  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
-  private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
-  private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
-  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
-      StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<CombiningState<Integer, int[], Integer>>
-      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<SetState<String>> STRING_SET_ADDR =
-      StateTags.set("stringSet", StringUtf8Coder.of());
-  private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
-      StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
-  private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
-
-  private StateInternals underTest;
-
-  @Before
-  public void setUp() {
-    this.underTest = createStateInternals();
-  }
-
-  protected abstract StateInternals createStateInternals();
-
-  @Test
-  public void testValue() throws Exception {
-    ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value));
-    assertThat(
-        underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
-        Matchers.not(equalTo(value)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.write("hello");
-    assertThat(value.read(), equalTo("hello"));
-    value.write("world");
-    assertThat(value.read(), equalTo("world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.nullValue());
-    assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value));
-  }
-
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR)));
-    assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))));
-
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), containsInAnyOrder("hello"));
-
-    value.add("world");
-    assertThat(value.read(), containsInAnyOrder("hello", "world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), equalTo(value));
-  }
-
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeBagIntoSource() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!"));
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testMergeBagIntoNewNamespace() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!"));
-    assertThat(bag1.read(), Matchers.emptyIterable());
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testSet() throws Exception {
-
-    SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR)));
-    assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR))));
-
-    // empty
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertFalse(value.contains("A").read());
-
-    // add
-    value.add("A");
-    value.add("B");
-    value.add("A");
-    assertFalse(value.addIfAbsent("B").read());
-    assertThat(value.read(), containsInAnyOrder("A", "B"));
-
-    // remove
-    value.remove("A");
-    assertThat(value.read(), containsInAnyOrder("B"));
-    value.remove("C");
-    assertThat(value.read(), containsInAnyOrder("B"));
-
-    // contains
-    assertFalse(value.contains("A").read());
-    assertTrue(value.contains("B").read());
-    value.add("C");
-    value.add("D");
-
-    // readLater
-    assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D"));
-    SetState<String> later = value.readLater();
-    assertThat(later.read(), hasItems("C", "D"));
-    assertFalse(later.contains("A").read());
-
-    // clear
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), equalTo(value));
-
-  }
-
-  @Test
-  public void testSetIsEmpty() throws Exception {
-
-    SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeSetIntoSource() throws Exception {
-
-    SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
-    SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
-
-    set1.add("Hello");
-    set2.add("Hello");
-    set2.add("World");
-    set1.add("!");
-
-    StateMerging.mergeSets(Arrays.asList(set1, set2), set1);
-
-    // Reading the merged set gets both the contents
-    assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!"));
-    assertThat(set2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testMergeSetIntoNewNamespace() throws Exception {
-
-    SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
-    SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
-    SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR);
-
-    set1.add("Hello");
-    set2.add("Hello");
-    set2.add("World");
-    set1.add("!");
-
-    StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3);
-
-    // Reading the merged set gets both the contents
-    assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!"));
-    assertThat(set1.read(), Matchers.emptyIterable());
-    assertThat(set2.read(), Matchers.emptyIterable());
-  }
-
-  // for testMap
-  private static class MapEntry<K, V> implements Map.Entry<K, V> {
-    private K key;
-    private V value;
-
-    private MapEntry(K key, V value) {
-      this.key = key;
-      this.value = value;
-    }
-
-    static <K, V> Map.Entry<K, V> of(K k, V v) {
-      return new MapEntry<>(k, v);
-    }
-
-    public final K getKey() {
-      return key;
-    }
-    public final V getValue() {
-      return value;
-    }
-
-    public final String toString() {
-      return key + "=" + value;
-    }
-
-    public final int hashCode() {
-      return Objects.hashCode(key) ^ Objects.hashCode(value);
-    }
-
-    public final V setValue(V newValue) {
-      V oldValue = value;
-      value = newValue;
-      return oldValue;
-    }
-
-    public final boolean equals(Object o) {
-      if (o == this) {
-        return true;
-      }
-      if (o instanceof Map.Entry) {
-        Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
-        if (Objects.equals(key, e.getKey())
-            && Objects.equals(value, e.getValue())) {
-          return true;
-        }
-      }
-      return false;
-    }
-  }
-
-  @Test
-  public void testMap() throws Exception {
-
-    MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR)));
-    assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR))));
-
-    // put
-    assertThat(value.entries().read(), Matchers.emptyIterable());
-    value.put("A", 1);
-    value.put("B", 2);
-    value.put("A", 11);
-    assertThat(value.putIfAbsent("B", 22).read(), equalTo(2));
-    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11),
-        MapEntry.of("B", 2)));
-
-    // remove
-    value.remove("A");
-    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
-    value.remove("C");
-    assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
-
-    // get
-    assertNull(value.get("A").read());
-    assertThat(value.get("B").read(), equalTo(2));
-    value.put("C", 3);
-    value.put("D", 4);
-    assertThat(value.get("C").read(), equalTo(3));
-
-    // iterate
-    value.put("E", 5);
-    value.remove("C");
-    assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E"));
-    assertThat(value.values().read(), containsInAnyOrder(2, 4, 5));
-    assertThat(
-        value.entries().read(),
-        containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
-
-    // readLater
-    assertThat(value.get("B").readLater().read(), equalTo(2));
-    assertNull(value.get("A").readLater().read());
-    assertThat(
-        value.entries().readLater().read(),
-        containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
-
-    // clear
-    value.clear();
-    assertThat(value.entries().read(), Matchers.emptyIterable());
-    assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), equalTo(value));
-  }
-
-  @Test
-  public void testCombiningValue() throws Exception {
-
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
-    assertThat(value.read(), equalTo(0));
-    value.add(2);
-    assertThat(value.read(), equalTo(2));
-
-    value.add(3);
-    assertThat(value.read(), equalTo(5));
-
-    value.clear();
-    assertThat(value.read(), equalTo(0));
-    assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), equalTo(value));
-  }
-
-  @Test
-  public void testCombiningIsEmpty() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(5);
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoSource() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    assertThat(value1.read(), equalTo(11));
-    assertThat(value2.read(), equalTo(10));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
-    assertThat(value1.read(), equalTo(21));
-    assertThat(value2.read(), equalTo(0));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value3 =
-        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value1.read(), equalTo(0));
-    assertThat(value2.read(), equalTo(0));
-    assertThat(value3.read(), equalTo(21));
-  }
-
-  @Test
-  public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), equalTo(new Instant(2000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), equalTo(new Instant(1000)));
-
-    value.clear();
-    assertThat(value.read(), equalTo(null));
-    assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), equalTo(value));
-  }
-
-  @Test
-  public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), equalTo(new Instant(3000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), equalTo(new Instant(3000)));
-
-    value.clear();
-    assertThat(value.read(), equalTo(null));
-    assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), equalTo(value));
-  }
-
-  @Test
-  public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), equalTo(new Instant(2000)));
-
-    value.clear();
-    assertThat(value.read(), equalTo(null));
-    assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), equalTo(value));
-  }
-
-  @Test
-  public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(new Instant(1000));
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the merged value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
-
-    assertThat(value1.read(), equalTo(new Instant(2000)));
-    assertThat(value2.read(), equalTo(null));
-  }
-
-  @Test
-  public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value3 =
-        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value3.read(), equalTo(new Instant(5000)));
-    assertThat(value1.read(), equalTo(null));
-    assertThat(value2.read(), equalTo(null));
-  }
-
-  @Test
-  public void testSetReadable() throws Exception {
-    SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
-
-    // test contains
-    ReadableState<Boolean> readable = value.contains("A");
-    value.add("A");
-    assertFalse(readable.read());
-
-    // test addIfAbsent
-    value.addIfAbsent("B");
-    assertTrue(value.contains("B").read());
-  }
-
-  @Test
-  public void testMapReadable() throws Exception {
-    MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
-
-    // test iterable, should just return a iterable view of the values contained in this map.
-    // The iterable is backed by the map, so changes to the map are reflected in the iterable.
-    ReadableState<Iterable<String>> keys = value.keys();
-    ReadableState<Iterable<Integer>> values = value.values();
-    ReadableState<Iterable<Map.Entry<String, Integer>>> entries = value.entries();
-    value.put("A", 1);
-    assertFalse(Iterables.isEmpty(keys.read()));
-    assertFalse(Iterables.isEmpty(values.read()));
-    assertFalse(Iterables.isEmpty(entries.read()));
-
-    // test get
-    ReadableState<Integer> get = value.get("B");
-    value.put("B", 2);
-    assertNull(get.read());
-
-    // test addIfAbsent
-    value.putIfAbsent("C", 3);
-    assertThat(value.get("C").read(), equalTo(3));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
index 26cbfee..9769d10 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
@@ -116,21 +116,6 @@ public class WindowMatchers {
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      Matcher<T> valueMatcher,
-      long timestamp,
-      long windowStart,
-      long windowEnd,
-      PaneInfo paneInfo) {
-    IntervalWindow intervalWindow =
-        new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
-    return WindowMatchers.<T>isSingleWindowedValue(
-        valueMatcher,
-        Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
-        Matchers.<BoundedWindow>equalTo(intervalWindow),
-        Matchers.equalTo(paneInfo));
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
       Matcher<? super T> valueMatcher,
       Matcher<? super Instant> timestampMatcher,
       Matcher<? super BoundedWindow> windowMatcher) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
index 2be90de..453c8ff 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -41,8 +42,8 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class AfterFirstStateMachineTest {
 
-  @Mock private TriggerStateMachine mockTrigger1;
-  @Mock private TriggerStateMachine mockTrigger2;
+  @Mock private OnceTriggerStateMachine mockTrigger1;
+  @Mock private OnceTriggerStateMachine mockTrigger2;
   private SimpleTriggerStateMachineTester<IntervalWindow> tester;
   private static TriggerStateMachine.TriggerContext anyTriggerContext() {
     return Mockito.<TriggerStateMachine.TriggerContext>any();


Mime
View raw message