beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: BEAM-1981 Fix ApexTimerInternals serialization error.
Date Fri, 28 Apr 2017 19:36:00 GMT
Repository: beam
Updated Branches:
  refs/heads/master f9406c766 -> 2bd668fdb


BEAM-1981 Fix ApexTimerInternals serialization error.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f37d9548
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f37d9548
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f37d9548

Branch: refs/heads/master
Commit: f37d954873ae03cdae854837570b62e334be9022
Parents: 185dc47
Author: Thomas Weise <thw@apache.org>
Authored: Thu Apr 27 02:06:50 2017 -0700
Committer: Thomas Weise <thw@apache.org>
Committed: Thu Apr 27 23:51:16 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       | 165 +--------
 .../operators/ApexParDoOperator.java            | 104 ++----
 .../operators/ApexTimerInternals.java           | 201 ++++++++++
 .../translation/ApexStateInternalsTest.java     | 369 +++++++++++++++++++
 .../operators/ApexTimerInternalsTest.java       |  89 +++++
 .../utils/ApexStateInternalsTest.java           | 368 ------------------
 6 files changed, 692 insertions(+), 604 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 7d17ac6..f8b6653 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -29,38 +29,28 @@ import com.datatorrent.netlet.util.Slice;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
-import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -93,9 +83,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   private final SerializablePipelineOptions serializedOptions;
   @Bind(JavaSerializer.class)
   private final StateInternalsFactory<K> stateInternalsFactory;
-  private Map<Slice, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
-  private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
+  private final ApexTimerInternals<K> timerInternals;
   private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
   public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input =
@@ -137,12 +125,16 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
     this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
     this.stateInternalsFactory = stateBackend.newStateInternalsFactory(keyCoder);
+    TimerInternals.TimerDataCoder timerCoder =
+        TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+    this.timerInternals = new ApexTimerInternals<>(timerCoder);
   }
 
   @SuppressWarnings("unused") // for Kryo
   private ApexGroupByKeyOperator() {
     this.serializedOptions = null;
     this.stateInternalsFactory = null;
+    this.timerInternals = null;
   }
 
   @Override
@@ -203,40 +195,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
         serializedOptions.get());
   }
 
-  /**
-   * Returns the list of timers that are ready to fire. These are the timers
-   * that are registered to be triggered at a time before the current watermark.
-   * We keep these timers in a Set, so that they are deduplicated, as the same
-   * timer can be registered multiple times.
-   */
-  private Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess(
-      long currentWatermark) {
-
-    // we keep the timers to return in a different list and launch them later
-    // because we cannot prevent a trigger from registering another trigger,
-    // which would lead to concurrent modification exception.
-    Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create();
-
-    Iterator<Map.Entry<Slice, Set<TimerInternals.TimerData>>> it =
-        activeTimers.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<Slice, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
-
-      Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
-      while (timerIt.hasNext()) {
-        TimerInternals.TimerData timerData = timerIt.next();
-        if (timerData.getTimestamp().isBefore(currentWatermark)) {
-          toFire.put(keyWithTimers.getKey(), timerData);
-          timerIt.remove();
-        }
-      }
-
-      if (keyWithTimers.getValue().isEmpty()) {
-        it.remove();
-      }
-    }
-    return toFire;
-  }
 
   private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
     final KV<K, V> kv = windowedValue.getValue();
@@ -244,58 +202,21 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
         windowedValue.getTimestamp(),
         windowedValue.getWindows(),
         windowedValue.getPane());
-    timerInternals.setKey(kv.getKey());
+    timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark);
     ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
         newReduceFnRunner(kv.getKey());
     reduceFnRunner.processElements(Collections.singletonList(updatedWindowedValue));
     reduceFnRunner.persist();
   }
 
-  private StateInternals<K> getStateInternalsForKey(K key) {
-    return stateInternalsFactory.stateInternalsForKey(key);
-  }
-
-  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
-    final Slice keyBytes;
-    try {
-      keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
-    if (timersForKey == null) {
-      timersForKey = new HashSet<>();
-    }
-    timersForKey.add(timer);
-    activeTimers.put(keyBytes, timersForKey);
-  }
-
-  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
-    final Slice keyBytes;
-    try {
-      keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
-    if (timersForKey != null) {
-      timersForKey.remove(timer);
-      if (timersForKey.isEmpty()) {
-        activeTimers.remove(keyBytes);
-      } else {
-        activeTimers.put(keyBytes, timersForKey);
-      }
-    }
-  }
-
   private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
     this.inputWatermark = new Instant(mark.getTimestamp());
-    Multimap<Slice, TimerInternals.TimerData> timers = getTimersReadyToProcess(
+    Multimap<Slice, TimerInternals.TimerData> timers = timerInternals.getTimersReadyToProcess(
         mark.getTimestamp());
     if (!timers.isEmpty()) {
       for (Slice keyBytes : timers.keySet()) {
         K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
-        timerInternals.setKey(key);
+        timerInternals.setContext(key, keyCoder, inputWatermark);
         ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
         reduceFnRunner.onTimers(timers.get(keyBytes));
         reduceFnRunner.persist();
@@ -303,74 +224,4 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     }
   }
 
-  /**
-   * An implementation of Beam's {@link TimerInternals}.
-   *
-   */
-  private class ApexTimerInternals implements TimerInternals {
-    private K key;
-
-    public void setKey(K key) {
-      this.key = key;
-    }
-
-    @Deprecated
-    @Override
-    public void setTimer(TimerData timerData) {
-      registerActiveTimer(key, timerData);
-    }
-
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-      throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
-    }
-
-    @Deprecated
-    @Override
-    public void deleteTimer(TimerData timerKey) {
-      unregisterActiveTimer(key, timerKey);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return Instant.now();
-    }
-
-    @Override
-    public Instant currentSynchronizedProcessingTime() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return inputWatermark;
-    }
-
-    @Override
-    public Instant currentOutputWatermarkTime() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public void setTimer(StateNamespace namespace, String timerId, Instant target,
-        TimeDomain timeDomain) {
-      throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
-    }
-
-    @Deprecated
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId) {
-      throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
-    }
-
-  }
-
-  private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K> {
-    @Override
-    public StateInternals<K> stateInternalsForKey(K key) {
-      return getStateInternalsForKey(key);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index f4c617d..9b5a75c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -28,7 +28,6 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
@@ -42,17 +41,16 @@ import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializa
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -60,9 +58,9 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -90,12 +88,11 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   private final WindowingStrategy<?, ?> windowingStrategy;
   @Bind(JavaSerializer.class)
   private final List<PCollectionView<?>> sideInputs;
+  @Bind(JavaSerializer.class)
+  private final Coder<WindowedValue<InputT>> inputCoder;
 
   private StateInternalsProxy<?> currentKeyStateInternals;
-  // TODO: if the operator gets restored to checkpointed state due to a failure,
-  // the timer state is lost.
-  private final transient CurrentKeyTimerInternals<Object> currentKeyTimerInternals =
-      new CurrentKeyTimerInternals<>();
+  private final ApexTimerInternals<Object> currentKeyTimerInternals;
 
   private final StateInternals<Void> sideInputStateInternals;
   private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
@@ -134,10 +131,14 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       throw new UnsupportedOperationException(msg);
     }
 
-    Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
+    Coder<List<WindowedValue<InputT>>> listCoder = ListCoder.of(inputCoder);
     this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
-        coder);
+        listCoder);
+    this.inputCoder = inputCoder;
 
+    TimerInternals.TimerDataCoder timerCoder =
+        TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+    this.currentKeyTimerInternals = new ApexTimerInternals<>(timerCoder);
   }
 
   @SuppressWarnings("unused") // for Kryo
@@ -150,6 +151,8 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
     this.sideInputs = null;
     this.pushedBack = null;
     this.sideInputStateInternals = null;
+    this.inputCoder = null;
+    this.currentKeyTimerInternals = null;
   }
 
   public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input =
@@ -253,14 +256,23 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
       pushbackDoFnRunner.startBundle();
       if (currentKeyStateInternals != null) {
         InputT value = elem.getValue();
-        Object key;
+        final Object key;
+        final Coder<Object> keyCoder;
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        WindowedValueCoder<InputT> wvCoder = (WindowedValueCoder) inputCoder;
         if (value instanceof KeyedWorkItem) {
           key = ((KeyedWorkItem) value).key();
+          @SuppressWarnings({ "rawtypes", "unchecked" })
+          KeyedWorkItemCoder<Object, ?> kwiCoder = (KeyedWorkItemCoder) wvCoder.getValueCoder();
+          keyCoder = kwiCoder.getKeyCoder();
         } else {
           key = ((KV) value).getKey();
+          @SuppressWarnings({ "rawtypes", "unchecked" })
+          KvCoder<Object, ?> kwiCoder = (KvCoder) wvCoder.getValueCoder();
+          keyCoder = kwiCoder.getKeyCoder();
         }
         ((StateInternalsProxy) currentKeyStateInternals).setKey(key);
-        currentKeyTimerInternals.currentKey = key;
+        currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark));
       }
       Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
           .processElementInReadyWindows(elem);
@@ -400,70 +412,4 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
 
   }
 
-  private class CurrentKeyTimerInternals<K> implements TimerInternals {
-
-    private TimerInternalsFactory<K> factory = new TimerInternalsFactory<K>() {
-      @Override
-      public TimerInternals timerInternalsForKey(K key) {
-        InMemoryTimerInternals timerInternals = perKeyTimerInternals.get(key);
-        if (timerInternals == null) {
-          perKeyTimerInternals.put(key, timerInternals = new InMemoryTimerInternals());
-        }
-        return timerInternals;
-      }
-    };
-
-    // TODO: durable state store
-    final Map<K, InMemoryTimerInternals> perKeyTimerInternals = new HashMap<>();
-    private K currentKey;
-
-    @Override
-    public void setTimer(StateNamespace namespace, String timerId, Instant target,
-        TimeDomain timeDomain) {
-      factory.timerInternalsForKey(currentKey).setTimer(
-          namespace, timerId, target, timeDomain);
-    }
-
-    @Override
-    public void setTimer(TimerData timerData) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void deleteTimer(TimerData timerKey) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Instant currentSynchronizedProcessingTime() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return new Instant(currentInputWatermark);
-    }
-
-    @Override
-    public Instant currentOutputWatermarkTime() {
-      throw new UnsupportedOperationException();
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
new file mode 100644
index 0000000..b142095
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.netlet.util.Slice;
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of Beam's {@link TimerInternals}.
+ *
+ * <p>Assumes that the current key is set prior to accessing the state.<br>
+ * This implementation stores timer data in heap memory and is serialized
+ * during checkpointing, it will only work with a small number of timers.
+ * @param <K>
+ */
+@DefaultSerializer(JavaSerializer.class)
+class ApexTimerInternals<K> implements TimerInternals, Serializable {
+
+  private Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
+  private TimerDataCoder timerDataCoder;
+  private transient K currentKey;
+  private transient Instant currentInputWatermark;
+  private transient Coder<K> keyCoder;
+
+  public ApexTimerInternals(TimerDataCoder timerDataCoder) {
+    this.timerDataCoder = timerDataCoder;
+  }
+
+  public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark) {
+    this.currentKey = key;
+    this.keyCoder = keyCoder;
+    this.currentInputWatermark = inputWatermark;
+  }
+
+  @Override
+  public void setTimer(StateNamespace namespace, String timerId, Instant target,
+      TimeDomain timeDomain) {
+    TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain);
+    registerActiveTimer(currentKey, timerData);
+  }
+
+  @Override
+  public void setTimer(TimerData timerData) {
+    registerActiveTimer(currentKey, timerData);
+  }
+
+  @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteTimer(StateNamespace namespace, String timerId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteTimer(TimerData timerKey) {
+    unregisterActiveTimer(currentKey, timerKey);
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return Instant.now();
+  }
+
+  @Override
+  public Instant currentSynchronizedProcessingTime() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return new Instant(currentInputWatermark);
+  }
+
+  @Override
+  public Instant currentOutputWatermarkTime() {
+    return null;
+  }
+
+  /**
+   * Returns the list of timers that are ready to fire. These are the timers
+   * that are registered to be triggered at a time before the current watermark.
+   * We keep these timers in a Set, so that they are deduplicated, as the same
+   * timer can be registered multiple times.
+   */
+  public Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess(
+      long currentWatermark) {
+
+    // we keep the timers to return in a different list and launch them later
+    // because we cannot prevent a trigger from registering another timer,
+    // which would lead to concurrent modification exception.
+    Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+    Iterator<Map.Entry<Slice, Set<Slice>>> it =
+        activeTimers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<Slice, Set<Slice>> keyWithTimers = it.next();
+
+      Iterator<Slice> timerIt = keyWithTimers.getValue().iterator();
+      while (timerIt.hasNext()) {
+        try {
+          TimerData timerData = CoderUtils.decodeFromByteArray(timerDataCoder,
+              timerIt.next().buffer);
+          if (timerData.getTimestamp().isBefore(currentWatermark)) {
+            toFire.put(keyWithTimers.getKey(), timerData);
+            timerIt.remove();
+          }
+        } catch (CoderException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      if (keyWithTimers.getValue().isEmpty()) {
+        it.remove();
+      }
+    }
+    return toFire;
+  }
+
+  private void registerActiveTimer(K key, TimerData timer) {
+    final Slice keyBytes;
+    try {
+      keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+    Set<Slice> timersForKey = activeTimers.get(keyBytes);
+    if (timersForKey == null) {
+      timersForKey = new HashSet<>();
+    }
+
+    try {
+      Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer));
+      timersForKey.add(timerBytes);
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+
+    activeTimers.put(keyBytes, timersForKey);
+  }
+
+  private void unregisterActiveTimer(K key, TimerData timer) {
+    final Slice keyBytes;
+    try {
+      keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+
+    Set<Slice> timersForKey = activeTimers.get(keyBytes);
+    if (timersForKey != null) {
+      try {
+        Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer));
+        timersForKey.add(timerBytes);
+        timersForKey.remove(timerBytes);
+      } catch (CoderException e) {
+        throw new RuntimeException(e);
+      }
+
+      if (timersForKey.isEmpty()) {
+        activeTimers.remove(keyBytes);
+      } else {
+        activeTimers.put(keyBytes, timersForKey);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
new file mode 100644
index 0000000..4021c62
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+import java.util.Arrays;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
+import org.apache.beam.runners.core.StateMerging;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaceForTest;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ApexStateInternals}. This is based on the tests for
+ * {@code InMemoryStateInternals}.
+ */
+public class ApexStateInternalsTest {
+  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
+  private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+  private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
+  private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
+
+  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+      StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+      StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+
+  private ApexStateInternals<String> underTest;
+
+  @Before
+  public void initStateInternals() {
+    underTest = new ApexStateInternals.ApexStateBackend()
+        .newStateInternalsFactory(StringUtf8Coder.of())
+        .stateInternalsForKey((String) null);
+  }
+
+  @Test
+  public void testBag() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+
+    assertThat(value.read(), Matchers.emptyIterable());
+    value.add("hello");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+
+    value.add("world");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+
+  }
+
+  @Test
+  public void testBagIsEmpty() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add("hello");
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeBagIntoSource() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testMergeBagIntoNewNamespace() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag1.read(), Matchers.emptyIterable());
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testCombiningValue() throws Exception {
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+    assertThat(value.read(), Matchers.equalTo(0));
+    value.add(2);
+    assertThat(value.read(), Matchers.equalTo(2));
+
+    value.add(3);
+    assertThat(value.read(), Matchers.equalTo(5));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(0));
+    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
+  }
+
+  @Test
+  public void testCombiningIsEmpty() throws Exception {
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(5);
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoSource() throws Exception {
+    CombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    assertThat(value1.read(), Matchers.equalTo(11));
+    assertThat(value2.read(), Matchers.equalTo(10));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+    assertThat(value1.read(), Matchers.equalTo(21));
+    assertThat(value2.read(), Matchers.equalTo(0));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+    CombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value3 =
+        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value1.read(), Matchers.equalTo(0));
+    assertThat(value2.read(), Matchers.equalTo(0));
+    assertThat(value3.read(), Matchers.equalTo(21));
+  }
+
+  @Test
+  public void testWatermarkEarliestState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkLatestState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkEndOfWindowState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkStateIsEmpty() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(new Instant(1000));
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeEarliestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState<BoundedWindow> value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the merged value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
+
+    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testMergeLatestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState<BoundedWindow> value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value3 =
+        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
+    assertThat(value1.read(), Matchers.equalTo(null));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testSerialization() throws Exception {
+    ApexStateInternalsFactory<String> sif = new ApexStateBackend().
+        newStateInternalsFactory(StringUtf8Coder.of());
+    ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
+
+    ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+    value.write("hello");
+
+    ApexStateInternalsFactory<String> cloned;
+    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
+    ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
+
+    ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    assertThat(clonedValue.read(), Matchers.equalTo("hello"));
+    assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
new file mode 100644
index 0000000..1d7e586
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ApexTimerInternals}.
+ */
+public class ApexTimerInternalsTest {
+
+  @Test
+  public void testEventTimeTimers() {
+    TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+    String key1 = "key1";
+    Instant instant0 = new Instant(0);
+    Instant instant1 = new Instant(1);
+    Instant instant2 = new Instant(2);
+
+    ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
+    timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now());
+
+    TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(),
+        instant0, TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timerData0);
+
+    TimerData timerData1 = TimerData.of("timerData1", StateNamespaces.global(),
+        instant1, TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timerData1);
+
+    Multimap<Slice, TimerData> timers = timerInternals.getTimersReadyToProcess(
+        instant0.getMillis());
+    assertEquals(0, timers.size());
+
+    timers = timerInternals.getTimersReadyToProcess(instant1.getMillis());
+    assertEquals(1, timers.size());
+    assertEquals(Sets.newHashSet(timerData0), Sets.newHashSet(timers.values()));
+
+    timers = timerInternals.getTimersReadyToProcess(instant2.getMillis());
+    assertEquals(1, timers.size());
+    assertEquals(Sets.newHashSet(timerData1), Sets.newHashSet(timers.values()));
+  }
+
+  @Test
+  public void testSerialization() {
+    TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+    TimerData timerData = TimerData.of("arbitrary-id", StateNamespaces.global(),
+        new Instant(0), TimeDomain.EVENT_TIME);
+    String key = "key";
+    ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
+    timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now());
+    timerInternals.setTimer(timerData);
+    ApexTimerInternals<String> cloned;
+    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(timerInternals));
+    cloned.setContext(key, StringUtf8Coder.of(), Instant.now());
+    Multimap<Slice, TimerData> timers = cloned.getTimersReadyToProcess(new Instant(1).getMillis());
+    assertEquals(1, timers.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
deleted file mode 100644
index 7160e45..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-import java.util.Arrays;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.GroupingState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for {@link ApexStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-public class ApexStateInternalsTest {
-  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
-  private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
-  private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
-  private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
-      StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
-      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
-          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
-      StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_EARLIEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
-      WATERMARK_LATEST_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
-      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
-
-  private ApexStateInternals<String> underTest;
-
-  @Before
-  public void initStateInternals() {
-    underTest = new ApexStateInternals.ApexStateBackend()
-        .newStateInternalsFactory(StringUtf8Coder.of())
-        .stateInternalsForKey((String) null);
-  }
-
-  @Test
-  public void testBag() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
-    assertThat(value.read(), Matchers.emptyIterable());
-    value.add("hello");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
-    value.add("world");
-    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
-    value.clear();
-    assertThat(value.read(), Matchers.emptyIterable());
-    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
-  }
-
-  @Test
-  public void testBagIsEmpty() throws Exception {
-    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add("hello");
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeBagIntoSource() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testMergeBagIntoNewNamespace() throws Exception {
-    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
-    bag1.add("Hello");
-    bag2.add("World");
-    bag1.add("!");
-
-    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
-    // Reading the merged bag gets both the contents
-    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
-    assertThat(bag1.read(), Matchers.emptyIterable());
-    assertThat(bag2.read(), Matchers.emptyIterable());
-  }
-
-  @Test
-  public void testCombiningValue() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
-    assertThat(value.read(), Matchers.equalTo(0));
-    value.add(2);
-    assertThat(value.read(), Matchers.equalTo(2));
-
-    value.add(3);
-    assertThat(value.read(), Matchers.equalTo(5));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(0));
-    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
-  }
-
-  @Test
-  public void testCombiningIsEmpty() throws Exception {
-    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(5);
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoSource() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    assertThat(value1.read(), Matchers.equalTo(11));
-    assertThat(value2.read(), Matchers.equalTo(10));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
-    assertThat(value1.read(), Matchers.equalTo(21));
-    assertThat(value2.read(), Matchers.equalTo(0));
-  }
-
-  @Test
-  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
-    CombiningState<Integer, int[], Integer> value1 =
-        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value2 =
-        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-    CombiningState<Integer, int[], Integer> value3 =
-        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
-    value1.add(5);
-    value2.add(10);
-    value1.add(6);
-
-    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value1.read(), Matchers.equalTo(0));
-    assertThat(value2.read(), Matchers.equalTo(0));
-    assertThat(value3.read(), Matchers.equalTo(21));
-  }
-
-  @Test
-  public void testWatermarkEarliestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkLatestState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.add(new Instant(3000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.add(new Instant(1000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkEndOfWindowState() throws Exception {
-    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
-
-    // State instances are cached, but depend on the namespace.
-    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
-    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
-
-    assertThat(value.read(), Matchers.nullValue());
-    value.add(new Instant(2000));
-    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
-    value.clear();
-    assertThat(value.read(), Matchers.equalTo(null));
-    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
-  }
-
-  @Test
-  public void testWatermarkStateIsEmpty() throws Exception {
-    WatermarkHoldState<BoundedWindow> value =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
-    assertThat(value.isEmpty().read(), Matchers.is(true));
-    ReadableState<Boolean> readFuture = value.isEmpty();
-    value.add(new Instant(1000));
-    assertThat(readFuture.read(), Matchers.is(false));
-
-    value.clear();
-    assertThat(readFuture.read(), Matchers.is(true));
-  }
-
-  @Test
-  public void testMergeEarliestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the merged value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
-
-    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testMergeLatestWatermarkIntoSource() throws Exception {
-    WatermarkHoldState<BoundedWindow> value1 =
-        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value2 =
-        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
-    WatermarkHoldState<BoundedWindow> value3 =
-        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
-    value1.add(new Instant(3000));
-    value2.add(new Instant(5000));
-    value1.add(new Instant(4000));
-    value2.add(new Instant(2000));
-
-    // Merging clears the old values and updates the result value.
-    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
-
-    // Merging clears the old values and updates the result value.
-    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
-    assertThat(value1.read(), Matchers.equalTo(null));
-    assertThat(value2.read(), Matchers.equalTo(null));
-  }
-
-  @Test
-  public void testSerialization() throws Exception {
-    ApexStateInternalsFactory<String> sif = new ApexStateBackend().
-        newStateInternalsFactory(StringUtf8Coder.of());
-    ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
-
-    ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
-    assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-    value.write("hello");
-
-    ApexStateInternalsFactory<String> cloned;
-    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
-    ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
-
-    ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
-    assertThat(clonedValue.read(), Matchers.equalTo("hello"));
-    assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-  }
-
-}


Mime
View raw message