beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: BEAM-2022 fix triggering for processing time timers
Date Sun, 30 Apr 2017 15:44:30 GMT
Repository: beam
Updated Branches:
  refs/heads/master fc55d2f81 -> 202aae9d3


BEAM-2022 fix triggering for processing time timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb860388
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb860388
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb860388

Branch: refs/heads/master
Commit: eb860388a8626837655e82171c8480421384e419
Parents: 2b6cb8c
Author: Thomas Weise <thw@apache.org>
Authored: Sat Apr 29 01:17:22 2017 -0700
Committer: Thomas Weise <thw@apache.org>
Committed: Sat Apr 29 01:17:22 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |  41 ++-
 .../operators/ApexTimerInternals.java           | 155 +++++---
 .../translation/ApexStateInternalsTest.java     | 368 -------------------
 .../operators/ApexTimerInternalsTest.java       |  78 +++-
 .../utils/ApexStateInternalsTest.java           | 367 ++++++++++++++++++
 5 files changed, 567 insertions(+), 442 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index f8b6653..3c9f5ab 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -25,11 +25,9 @@ import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.netlet.util.Slice;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Multimap;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
@@ -41,6 +39,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -49,8 +48,8 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -68,7 +67,8 @@ import org.slf4j.LoggerFactory;
  * @param <K> key type
  * @param <V> value type
  */
-public class ApexGroupByKeyOperator<K, V> implements Operator {
+public class ApexGroupByKeyOperator<K, V> implements Operator,
+    ApexTimerInternals.TimerProcessor<K> {
   private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
   private boolean traceTuples = true;
 
@@ -106,7 +106,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
         }
         processElement(t.getValue());
       } catch (Exception e) {
-        Throwables.propagateIfPossible(e);
+        Throwables.throwIfUnchecked(e);
         throw new RuntimeException(e);
       }
     }
@@ -143,6 +143,8 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
 
   @Override
   public void endWindow() {
+    timerInternals.fireReadyTimers(timerInternals.currentProcessingTime().getMillis(),
+        this, TimeDomain.PROCESSING_TIME);
   }
 
   @Override
@@ -195,7 +197,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
         serializedOptions.get());
   }
 
-
   private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
     final KV<K, V> kv = windowedValue.getValue();
     final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(),
@@ -209,19 +210,23 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     reduceFnRunner.persist();
   }
 
-  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
-    this.inputWatermark = new Instant(mark.getTimestamp());
-    Multimap<Slice, TimerInternals.TimerData> timers = timerInternals.getTimersReadyToProcess(
-        mark.getTimestamp());
-    if (!timers.isEmpty()) {
-      for (Slice keyBytes : timers.keySet()) {
-        K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
-        timerInternals.setContext(key, keyCoder, inputWatermark);
-        ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
-        reduceFnRunner.onTimers(timers.get(keyBytes));
-        reduceFnRunner.persist();
-      }
+  @Override
+  public void fireTimer(K key, Collection<TimerData> timerData) {
+    timerInternals.setContext(key, keyCoder, inputWatermark);
+    ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
+    try {
+      reduceFnRunner.onTimers(timerData);
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
     }
+    reduceFnRunner.persist();
+  }
+
+  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
+    this.inputWatermark = new Instant(mark.getTimestamp());
+    timerInternals.fireReadyTimers(this.inputWatermark.getMillis(),
+        this, TimeDomain.EVENT_TIME);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
index b142095..15ccbee 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
@@ -20,9 +20,12 @@ package org.apache.beam.runners.apex.translation.operators;
 import com.datatorrent.netlet.util.Slice;
 import com.esotericsoftware.kryo.DefaultSerializer;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -47,14 +50,16 @@ import org.joda.time.Instant;
 @DefaultSerializer(JavaSerializer.class)
 class ApexTimerInternals<K> implements TimerInternals, Serializable {
 
-  private Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
-  private TimerDataCoder timerDataCoder;
+  private final TimerSet eventTimeTimeTimers;
+  private final TimerSet processingTimeTimers;
+
   private transient K currentKey;
   private transient Instant currentInputWatermark;
   private transient Coder<K> keyCoder;
 
   public ApexTimerInternals(TimerDataCoder timerDataCoder) {
-    this.timerDataCoder = timerDataCoder;
+    this.eventTimeTimeTimers = new TimerSet(timerDataCoder);
+    this.processingTimeTimers = new TimerSet(timerDataCoder);
   }
 
   public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark) {
@@ -63,31 +68,37 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
     this.currentInputWatermark = inputWatermark;
   }
 
+  @VisibleForTesting
+  protected TimerSet getTimerSet(TimeDomain domain) {
+    return (domain == TimeDomain.EVENT_TIME) ? eventTimeTimeTimers : processingTimeTimers;
+  }
+
   @Override
   public void setTimer(StateNamespace namespace, String timerId, Instant target,
       TimeDomain timeDomain) {
     TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain);
-    registerActiveTimer(currentKey, timerData);
+    setTimer(timerData);
   }
 
   @Override
   public void setTimer(TimerData timerData) {
-    registerActiveTimer(currentKey, timerData);
+    getTimerSet(timerData.getDomain()).addTimer(getKeyBytes(this.currentKey), timerData);
   }
 
   @Override
   public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-    throw new UnsupportedOperationException();
+    getTimerSet(timeDomain).deleteTimer(getKeyBytes(this.currentKey), namespace, timerId);
   }
 
   @Override
   public void deleteTimer(StateNamespace namespace, String timerId) {
-    throw new UnsupportedOperationException();
+    this.eventTimeTimeTimers.deleteTimer(getKeyBytes(this.currentKey), namespace, timerId);
+    this.processingTimeTimers.deleteTimer(getKeyBytes(this.currentKey), namespace, timerId);
   }
 
   @Override
   public void deleteTimer(TimerData timerKey) {
-    unregisterActiveTimer(currentKey, timerKey);
+    getTimerSet(timerKey.getDomain()).deleteTimer(getKeyBytes(this.currentKey), timerKey);
   }
 
   @Override
@@ -102,7 +113,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
 
   @Override
   public Instant currentInputWatermarkTime() {
-    return new Instant(currentInputWatermark);
+    return currentInputWatermark;
   }
 
   @Override
@@ -110,14 +121,17 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
     return null;
   }
 
+  public interface TimerProcessor<K> {
+    void fireTimer(K key, Collection<TimerData> timerData);
+  }
+
   /**
-   * Returns the list of timers that are ready to fire. These are the timers
-   * that are registered to be triggered at a time before the current watermark.
-   * We keep these timers in a Set, so that they are deduplicated, as the same
-   * timer can be registered multiple times.
+   * Fire the timers that are ready. These are the timers
+   * that are registered to be triggered at a time before the current time.
    */
-  public Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess(
-      long currentWatermark) {
+  public void fireReadyTimers(long currentTime,
+      TimerProcessor<K> timerProcessor, TimeDomain timeDomain) {
+    TimerSet timers = getTimerSet(timeDomain);
 
     // we keep the timers to return in a different list and launch them later
     // because we cannot prevent a trigger from registering another timer,
@@ -125,16 +139,16 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
     Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create();
 
     Iterator<Map.Entry<Slice, Set<Slice>>> it =
-        activeTimers.entrySet().iterator();
+        timers.activeTimers.entrySet().iterator();
     while (it.hasNext()) {
       Map.Entry<Slice, Set<Slice>> keyWithTimers = it.next();
 
       Iterator<Slice> timerIt = keyWithTimers.getValue().iterator();
       while (timerIt.hasNext()) {
         try {
-          TimerData timerData = CoderUtils.decodeFromByteArray(timerDataCoder,
+          TimerData timerData = CoderUtils.decodeFromByteArray(timers.timerDataCoder,
               timerIt.next().buffer);
-          if (timerData.getTimestamp().isBefore(currentWatermark)) {
+          if (timerData.getTimestamp().isBefore(currentTime)) {
             toFire.put(keyWithTimers.getKey(), timerData);
             timerIt.remove();
           }
@@ -147,55 +161,106 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
         it.remove();
       }
     }
-    return toFire;
-  }
 
-  private void registerActiveTimer(K key, TimerData timer) {
-    final Slice keyBytes;
-    try {
-      keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-    Set<Slice> timersForKey = activeTimers.get(keyBytes);
-    if (timersForKey == null) {
-      timersForKey = new HashSet<>();
+    // fire ready timers
+    if (!toFire.isEmpty()) {
+      for (Slice keyBytes : toFire.keySet()) {
+        try {
+          K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
+          timerProcessor.fireTimer(key, toFire.get(keyBytes));
+        } catch (CoderException e) {
+          throw new RuntimeException(e);
+        }
+      }
     }
+  }
 
+  private Slice getKeyBytes(K key) {
     try {
-      Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer));
-      timersForKey.add(timerBytes);
+      return new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
     } catch (CoderException e) {
       throw new RuntimeException(e);
     }
-
-    activeTimers.put(keyBytes, timersForKey);
   }
 
-  private void unregisterActiveTimer(K key, TimerData timer) {
-    final Slice keyBytes;
-    try {
-      keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
+  protected static class TimerSet implements Serializable {
+    private final Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
+    private final TimerDataCoder timerDataCoder;
+
+    protected TimerSet(TimerDataCoder timerDataCoder) {
+      this.timerDataCoder = timerDataCoder;
     }
 
-    Set<Slice> timersForKey = activeTimers.get(keyBytes);
-    if (timersForKey != null) {
+    public void addTimer(Slice keyBytes, TimerData timer) {
+      Set<Slice> timersForKey = activeTimers.get(keyBytes);
+      if (timersForKey == null) {
+        timersForKey = new HashSet<>();
+      }
+
       try {
         Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer));
         timersForKey.add(timerBytes);
-        timersForKey.remove(timerBytes);
       } catch (CoderException e) {
         throw new RuntimeException(e);
       }
 
+      activeTimers.put(keyBytes, timersForKey);
+    }
+
+    public void deleteTimer(Slice keyBytes, StateNamespace namespace, String timerId) {
+      Set<Slice> timersForKey = activeTimers.get(keyBytes);
+      if (timersForKey == null) {
+        return;
+      }
+
+      Iterator<Slice> timerIt = timersForKey.iterator();
+      while (timerIt.hasNext()) {
+        try {
+          TimerData timerData = CoderUtils.decodeFromByteArray(timerDataCoder,
+              timerIt.next().buffer);
+          ComparisonChain chain =
+              ComparisonChain.start().compare(timerData.getTimerId(), timerId);
+          if (chain.result() == 0 && !timerData.getNamespace().equals(namespace)) {
+            // Obtaining the stringKey may be expensive; only do so if required
+            chain = chain.compare(timerData.getNamespace().stringKey(), namespace.stringKey());
+          }
+          if (chain.result() == 0) {
+            timerIt.remove();
+          }
+        } catch (CoderException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
       if (timersForKey.isEmpty()) {
         activeTimers.remove(keyBytes);
-      } else {
-        activeTimers.put(keyBytes, timersForKey);
       }
     }
+
+    public void deleteTimer(Slice keyBytes, TimerData timerKey) {
+      Set<Slice> timersForKey = activeTimers.get(keyBytes);
+      if (timersForKey != null) {
+        try {
+          Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timerKey));
+          timersForKey.add(timerBytes);
+          timersForKey.remove(timerBytes);
+        } catch (CoderException e) {
+          throw new RuntimeException(e);
+        }
+
+        if (timersForKey.isEmpty()) {
+          activeTimers.remove(keyBytes);
+        } else {
+          activeTimers.put(keyBytes, timersForKey);
+        }
+      }
+    }
+
+    @VisibleForTesting
+    protected Map<Slice, Set<Slice>> getMap() {
+      return activeTimers;
+    }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
deleted file mode 100644
index 091fe3b..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-import java.util.Arrays;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.GroupingState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for {@link ApexStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-public class ApexStateInternalsTest {
-  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
-  private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
-  private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
-      StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
-      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState>
-      WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
-  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
-
-  private ApexStateInternals<String> underTest;
-
-  @Before
-  public void initStateInternals() {
-    underTest = new ApexStateInternals.ApexStateBackend()
-        .newStateInternalsFactory(StringUtf8Coder.of())
-        .stateInternalsForKey((String) null);
-  }
-
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
-    value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
-  }
-
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeBagIntoSource() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testMergeBagIntoNewNamespace() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
-    assertThat(bag1.read(), Matchers.emptyIterable());
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testCombiningValue() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
-    assertThat(value.read(), Matchers.equalTo(0));
-    value.add(2);
-    assertThat(value.read(), Matchers.equalTo(2));
-
-    value.add(3);
-    assertThat(value.read(), Matchers.equalTo(5));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(0));
-    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
-  }
-
-  @Test
-  public void testCombiningIsEmpty() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(5);
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoSource() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    assertThat(value1.read(), Matchers.equalTo(11));
-    assertThat(value2.read(), Matchers.equalTo(10));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
-    assertThat(value1.read(), Matchers.equalTo(21));
-    assertThat(value2.read(), Matchers.equalTo(0));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value3 =
-        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value1.read(), Matchers.equalTo(0));
-    assertThat(value2.read(), Matchers.equalTo(0));
-    assertThat(value3.read(), Matchers.equalTo(21));
-  }
-
-  @Test
-  public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(new Instant(1000));
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the merged value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
-
-    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState value3 =
-        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
-    assertThat(value1.read(), Matchers.equalTo(null));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testSerialization() throws Exception {
-    ApexStateInternalsFactory<String> sif = new ApexStateBackend().
-        newStateInternalsFactory(StringUtf8Coder.of());
-    ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
-
-    ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
-    assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-    value.write("hello");
-
-    ApexStateInternalsFactory<String> cloned;
-    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
-    ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
-
-    ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
-    assertThat(clonedValue.read(), Matchers.equalTo("hello"));
-    assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
index 1d7e586..ee142e2 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -22,8 +22,12 @@ import static org.junit.Assert.assertNotNull;
 
 import com.datatorrent.lib.util.KryoCloneUtils;
 import com.datatorrent.netlet.util.Slice;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.apex.translation.operators.ApexTimerInternals.TimerProcessor;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
@@ -40,6 +44,15 @@ public class ApexTimerInternalsTest {
 
   @Test
   public void testEventTimeTimers() {
+
+    final Map<String, Collection<TimerData>> firedTimers = new HashMap<>();
+    TimerProcessor<String> timerProcessor = new TimerProcessor<String>() {
+      @Override
+      public void fireTimer(String key, Collection<TimerData> timerData) {
+        firedTimers.put(key, timerData);
+      }
+    };
+
     TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
     String key1 = "key1";
     Instant instant0 = new Instant(0);
@@ -57,17 +70,60 @@ public class ApexTimerInternalsTest {
         instant1, TimeDomain.EVENT_TIME);
     timerInternals.setTimer(timerData1);
 
-    Multimap<Slice, TimerData> timers = timerInternals.getTimersReadyToProcess(
-        instant0.getMillis());
-    assertEquals(0, timers.size());
+    timerInternals.fireReadyTimers(instant0.getMillis(), timerProcessor, TimeDomain.EVENT_TIME);
+    assertEquals(0, firedTimers.size());
+    firedTimers.clear();
 
-    timers = timerInternals.getTimersReadyToProcess(instant1.getMillis());
-    assertEquals(1, timers.size());
-    assertEquals(Sets.newHashSet(timerData0), Sets.newHashSet(timers.values()));
+    timerInternals.fireReadyTimers(instant1.getMillis(), timerProcessor,
+        TimeDomain.PROCESSING_TIME);
+    assertEquals(0, firedTimers.size());
+    timerInternals.fireReadyTimers(instant1.getMillis(), timerProcessor, TimeDomain.EVENT_TIME);
+    assertEquals(1, firedTimers.size());
+    assertEquals(Sets.newHashSet(timerData0),
+        Sets.newHashSet(firedTimers.values().iterator().next()));
+    firedTimers.clear();
 
-    timers = timerInternals.getTimersReadyToProcess(instant2.getMillis());
-    assertEquals(1, timers.size());
-    assertEquals(Sets.newHashSet(timerData1), Sets.newHashSet(timers.values()));
+    timerInternals.fireReadyTimers(instant2.getMillis(), timerProcessor, TimeDomain.EVENT_TIME);
+    assertEquals(1, firedTimers.size());
+    assertEquals(Sets.newHashSet(timerData1),
+        Sets.newHashSet(firedTimers.values().iterator().next()));
+    firedTimers.clear();
+  }
+
+  @Test
+  public void testDeleteTimer() {
+    TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+    String key1 = "key1";
+    Instant instant0 = new Instant(0);
+    Instant instant1 = new Instant(1);
+
+    ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
+    timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now());
+
+    TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(),
+        instant0, TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timerData0);
+
+    TimerData timerData1 = TimerData.of("timerData1", StateNamespaces.global(),
+        instant1, TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timerData1);
+
+    Map<?, Set<Slice>> timerMap = timerInternals.getTimerSet(TimeDomain.EVENT_TIME).getMap();
+    assertEquals(1, timerMap.size());
+    assertEquals(2, timerMap.values().iterator().next().size());
+
+    timerInternals.deleteTimer(timerData0.getNamespace(), timerData0.getTimerId());
+    assertEquals(1, timerMap.size());
+    assertEquals(1, timerMap.values().iterator().next().size());
+
+    timerInternals.deleteTimer(timerData1.getNamespace(), timerData1.getTimerId(),
+        TimeDomain.PROCESSING_TIME);
+    assertEquals(1, timerMap.size());
+    assertEquals(1, timerMap.values().iterator().next().size());
+
+    timerInternals.deleteTimer(timerData1.getNamespace(), timerData1.getTimerId(),
+        TimeDomain.EVENT_TIME);
+    assertEquals(0, timerMap.size());
   }
 
   @Test
@@ -82,7 +138,7 @@ public class ApexTimerInternalsTest {
     ApexTimerInternals<String> cloned;
     assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(timerInternals));
     cloned.setContext(key, StringUtf8Coder.of(), Instant.now());
-    Multimap<Slice, TimerData> timers = cloned.getTimersReadyToProcess(new Instant(1).getMillis());
+    Map<?, Set<Slice>> timers = cloned.getTimerSet(TimeDomain.EVENT_TIME).getMap();
     assertEquals(1, timers.size());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
new file mode 100644
index 0000000..225b654
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+import java.util.Arrays;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
+import org.apache.beam.runners.core.StateMerging;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaceForTest;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ApexStateInternals}. This is based on the tests for
+ * {@code InMemoryStateInternals}.
+ */
+public class ApexStateInternalsTest {
+  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
+  private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+  private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
+  private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
+
+  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+      StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+      StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<Object, WatermarkHoldState>
+      WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+  private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+
+  private ApexStateInternals<String> underTest;
+
+  @Before
+  public void initStateInternals() {
+    underTest = new ApexStateInternals.ApexStateBackend()
+        .newStateInternalsFactory(StringUtf8Coder.of())
+        .stateInternalsForKey((String) null);
+  }
+
+  @Test
+  public void testBag() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+
+    assertThat(value.read(), Matchers.emptyIterable());
+    value.add("hello");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+
+    value.add("world");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+
+  }
+
+  @Test
+  public void testBagIsEmpty() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add("hello");
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeBagIntoSource() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testMergeBagIntoNewNamespace() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag1.read(), Matchers.emptyIterable());
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testCombiningValue() throws Exception {
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+    assertThat(value.read(), Matchers.equalTo(0));
+    value.add(2);
+    assertThat(value.read(), Matchers.equalTo(2));
+
+    value.add(3);
+    assertThat(value.read(), Matchers.equalTo(5));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(0));
+    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
+  }
+
+  @Test
+  public void testCombiningIsEmpty() throws Exception {
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(5);
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoSource() throws Exception {
+    CombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    assertThat(value1.read(), Matchers.equalTo(11));
+    assertThat(value2.read(), Matchers.equalTo(10));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+    assertThat(value1.read(), Matchers.equalTo(21));
+    assertThat(value2.read(), Matchers.equalTo(0));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+    CombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value3 =
+        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value1.read(), Matchers.equalTo(0));
+    assertThat(value2.read(), Matchers.equalTo(0));
+    assertThat(value3.read(), Matchers.equalTo(21));
+  }
+
+  @Test
+  public void testWatermarkEarliestState() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkLatestState() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkEndOfWindowState() throws Exception {
+    WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkStateIsEmpty() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(new Instant(1000));
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeEarliestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+    WatermarkHoldState value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the merged value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
+
+    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testMergeLatestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState value3 =
+        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
+    assertThat(value1.read(), Matchers.equalTo(null));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testSerialization() throws Exception {
+    ApexStateInternalsFactory<String> sif = new ApexStateBackend().
+        newStateInternalsFactory(StringUtf8Coder.of());
+    ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
+
+    ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+    value.write("hello");
+
+    ApexStateInternalsFactory<String> cloned;
+    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
+    ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
+
+    ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    assertThat(clonedValue.read(), Matchers.equalTo("hello"));
+    assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+  }
+
+}


Mime
View raw message