beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [09/11] incubator-beam git commit: Put classes in runners-core package into runners.core namespace
Date Fri, 02 Sep 2016 17:43:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
new file mode 100644
index 0000000..3948d9e
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+
+/**
+ * {@link ReduceFn} implementing the default reduction behaviors of {@link GroupByKey}.
+ *
+ * @param <K> The type of key being processed.
+ * @param <InputT> The type of values associated with the key.
+ * @param <OutputT> The output type that will be produced for each key.
+ * @param <W> The type of windows this operates on.
+ */
+public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
+    extends ReduceFn<K, InputT, OutputT, W> {
+  private static final String BUFFER_NAME = "buf";
+
+  /**
+   * Create a factory that produces {@link SystemReduceFn} instances that that buffer all of the
+   * input values in persistent state and produces an {@code Iterable<T>}.
+   */
+  public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>
+      buffering(final Coder<T> inputCoder) {
+    final StateTag<Object, BagState<T>> bufferTag =
+        StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
+    return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
+      @Override
+      public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
+        StateMerging.prefetchBags(state, bufferTag);
+      }
+
+      @Override
+      public void onMerge(OnMergeContext c) throws Exception {
+        StateMerging.mergeBags(c.state(), bufferTag);
+      }
+    };
+  }
+
+  /**
+   * Create a factory that produces {@link SystemReduceFn} instances that combine all of the input
+   * values using a {@link CombineFn}.
+   */
+  public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT,
+      AccumT, OutputT, W>
+      combining(
+          final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+    final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag;
+    if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
+      bufferTag = StateTags.makeSystemTagInternal(
+          StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
+              BUFFER_NAME, combineFn.getAccumulatorCoder(),
+              (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn()));
+
+    } else {
+      bufferTag = StateTags.makeSystemTagInternal(
+            StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
+                BUFFER_NAME, combineFn.getAccumulatorCoder(),
+                (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn()));
+    }
+    return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
+      @Override
+      public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
+        StateMerging.prefetchCombiningValues(state, bufferTag);
+      }
+
+      @Override
+      public void onMerge(OnMergeContext c) throws Exception {
+        StateMerging.mergeCombiningValues(c.state(), bufferTag);
+      }
+    };
+  }
+
+  private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
+
+  public SystemReduceFn(
+      StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
+    this.bufferTag = bufferTag;
+  }
+
+  @Override
+  public void processValue(ProcessValueContext c) throws Exception {
+    c.state().access(bufferTag).add(c.value());
+  }
+
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+    justification = "prefetch side effect")
+  public void prefetchOnTrigger(StateAccessor<K> state) {
+    state.access(bufferTag).readLater();
+  }
+
+  @Override
+  public void onTrigger(OnTriggerContext c) throws Exception {
+    c.output(c.state().access(bufferTag).read());
+  }
+
+  @Override
+  public void clearState(Context c) throws Exception {
+    c.state().access(bufferTag).clear();
+  }
+
+  @Override
+  public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+    return state.access(bufferTag).isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
new file mode 100644
index 0000000..8d0f322
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.util.BitSetCoder;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.apache.beam.sdk.util.FinishedTriggers;
+import org.apache.beam.sdk.util.FinishedTriggersBitSet;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.TriggerContextFactory;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
+
+/**
+ * Executes a trigger while managing persistence of information about which subtriggers are
+ * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
+ *
+ * <p>Specifically, the responsibilities are:
+ *
+ * <ul>
+ *   <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
+ *       constructing the appropriate trigger contexts.</li>
+ *   <li>Committing a record of which subtriggers are finished to persistent state.</li>
+ *   <li>Restoring the record of which subtriggers are finished from persistent state.</li>
+ *   <li>Clearing out the persisted finished set when a caller indicates
+ *       (via {#link #clearFinished}) that it is no longer needed.</li>
+ * </ul>
+ *
+ * <p>These responsibilities are intertwined: trigger contexts include mutable information about
+ * which subtriggers are finished. This class provides the information when building the contexts
+ * and commits the information when the method of the {@link ExecutableTrigger} returns.
+ *
+ * @param <W> The kind of windows being processed.
+ */
+public class TriggerRunner<W extends BoundedWindow> {
+  @VisibleForTesting
+  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+      StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
+
+  private final ExecutableTrigger rootTrigger;
+  private final TriggerContextFactory<W> contextFactory;
+
+  public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
+    checkState(rootTrigger.getTriggerIndex() == 0);
+    this.rootTrigger = rootTrigger;
+    this.contextFactory = contextFactory;
+  }
+
+  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
+    if (!isFinishedSetNeeded()) {
+      // If no trigger in the tree will ever have finished bits, then we don't need to read them.
+      // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
+      // finished) for each trigger in the tree.
+      return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+    }
+
+    BitSet bitSet = state.read();
+    return bitSet == null
+        ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
+            : FinishedTriggersBitSet.fromBitSet(bitSet);
+  }
+
+
+  private void clearFinishedBits(ValueState<BitSet> state) {
+    if (!isFinishedSetNeeded()) {
+      // Nothing to clear.
+      return;
+    }
+    state.clear();
+  }
+
+  /** Return true if the trigger is closed in the window corresponding to the specified state. */
+  public boolean isClosed(StateAccessor<?> state) {
+    return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
+  }
+
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+      justification = "prefetch side effect")
+  public void prefetchForValue(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    rootTrigger.getSpec().prefetchOnElement(
+        contextFactory.createStateAccessor(window, rootTrigger));
+  }
+
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+      justification = "prefetch side effect")
+  public void prefetchOnFire(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
+  }
+
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+      justification = "prefetch side effect")
+  public void prefetchShouldFire(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    rootTrigger.getSpec().prefetchShouldFire(
+        contextFactory.createStateAccessor(window, rootTrigger));
+  }
+
+  /**
+   * Run the trigger logic to deal with a new value.
+   */
+  public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
+      throws Exception {
+    // Clone so that we can detect changes and so that changes here don't pollute merging.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
+        window, timers, timestamp, rootTrigger, finishedSet);
+    rootTrigger.invokeOnElement(triggerContext);
+    persistFinishedSet(state, finishedSet);
+  }
+
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+      justification = "prefetch side effect")
+  public void prefetchForMerge(
+      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
+    if (isFinishedSetNeeded()) {
+      for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
+        value.readLater();
+      }
+    }
+    rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
+        window, mergingWindows, rootTrigger));
+  }
+
+  /**
+   * Run the trigger merging logic as part of executing the specified merge.
+   */
+  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
+    // Clone so that we can detect changes and so that changes here don't pollute merging.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+
+    // And read the finished bits in each merging window.
+    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
+    for (Map.Entry<W, ValueState<BitSet>> entry :
+        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
+      // Don't need to clone these, since the trigger context doesn't allow modification
+      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
+      // Clear the underlying finished bits.
+      clearFinishedBits(entry.getValue());
+    }
+    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
+
+    Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
+        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
+
+    // Run the merge from the trigger
+    rootTrigger.invokeOnMerge(mergeContext);
+
+    persistFinishedSet(state, finishedSet);
+  }
+
+  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    Trigger.TriggerContext context = contextFactory.base(window, timers,
+        rootTrigger, finishedSet);
+    return rootTrigger.invokeShouldFire(context);
+  }
+
+  public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+    // shouldFire should be false.
+    // However it is too expensive to assert.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    Trigger.TriggerContext context = contextFactory.base(window, timers,
+        rootTrigger, finishedSet);
+    rootTrigger.invokeOnFire(context);
+    persistFinishedSet(state, finishedSet);
+  }
+
+  private void persistFinishedSet(
+      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
+    if (!isFinishedSetNeeded()) {
+      return;
+    }
+
+    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
+    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
+      if (modifiedFinishedSet.getBitSet().isEmpty()) {
+        finishedSetState.clear();
+      } else {
+        finishedSetState.write(modifiedFinishedSet.getBitSet());
+      }
+    }
+  }
+
+  /**
+   * Clear the finished bits.
+   */
+  public void clearFinished(StateAccessor<?> state) {
+    clearFinishedBits(state.access(FINISHED_BITS_TAG));
+  }
+
+  /**
+   * Clear the state used for executing triggers, but leave the finished set to indicate
+   * the window is closed.
+   */
+  public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
+    // Don't need to clone, because we'll be clearing the finished bits anyways.
+    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
+    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
+  }
+
+  private boolean isFinishedSetNeeded() {
+    // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
+    // lookup. Right now, we special case this for the DefaultTrigger.
+    return !(rootTrigger.getSpec() instanceof DefaultTrigger);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
new file mode 100644
index 0000000..7d0b608
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Implements the logic to hold the output watermark for a computation back
+ * until it has seen all the elements it needs based on the input watermark for the
+ * computation.
+ *
+ * <p>The backend ensures the output watermark can never progress beyond the
+ * input watermark for a computation. GroupAlsoByWindows computations may add a 'hold'
+ * to the output watermark in order to prevent it progressing beyond a time within a window.
+ * The hold will be 'cleared' when the associated pane is emitted.
+ *
+ * <p>This class is only intended for use by {@link ReduceFnRunner}. The two evolve together and
+ * will likely break any other uses.
+ *
+ * @param <W> The kind of {@link BoundedWindow} the hold is for.
+ */
+class WatermarkHold<W extends BoundedWindow> implements Serializable {
+  /**
+   * Return tag for state containing the output watermark hold
+   * used for elements.
+   */
+  public static <W extends BoundedWindow>
+      StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(
+          OutputTimeFn<? super W> outputTimeFn) {
+    return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal(
+        StateTags.<W>watermarkStateInternal("hold", outputTimeFn));
+  }
+
+  /**
+   * Tag for state containing end-of-window and garbage collection output watermark holds.
+   * (We can't piggy-back on the data hold state since the outputTimeFn may be
+   * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will
+   * would take the end-of-window time as its element time.)
+   */
+  @VisibleForTesting
+  public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG =
+      StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
+          "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
+
+  private final TimerInternals timerInternals;
+  private final WindowingStrategy<?, W> windowingStrategy;
+  private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;
+
+  public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
+    this.timerInternals = timerInternals;
+    this.windowingStrategy = windowingStrategy;
+    this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
+  }
+
+  /**
+   * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
+   * of the element in {@code context}. We allow the actual hold time to be shifted later by
+   * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will
+   * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
+   * was placed, or {@literal null} if no hold was placed.
+   *
+   * <p>In the following we'll write {@code E} to represent an element's timestamp after passing
+   * through the window strategy's output time function, {@code IWM} for the local input watermark,
+   * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection
+   * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right,
+   * and we write {@code [ ... ]} to denote a bounded window with implied lower bound.
+   *
+   * <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness}
+   * is {@code ZERO}.
+   *
+   * <p>Here are the cases we need to handle. They are conceptually considered in the
+   * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM.
+   * <ol>
+   * <li>(Normal)
+   * <pre>
+   *          |
+   *      [   | E        ]
+   *          |
+   *         IWM
+   * </pre>
+   * This is, hopefully, the common and happy case. The element is locally on-time and can
+   * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer
+   * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's
+   * timestamp (depending on the output time function). Thus the OWM will not proceed past E
+   * until the next pane fires.
+   *
+   * <li>(Discard - no target window)
+   * <pre>
+   *                       |                            |
+   *      [     E        ] |                            |
+   *                       |                            |
+   *                     GCWM  <-getAllowedLateness->  IWM
+   * </pre>
+   * The element is very locally late. The window has been garbage collected, thus there
+   * is no target pane E could be assigned to. We discard E.
+   *
+   * <li>(Unobservably late)
+   * <pre>
+   *          |    |
+   *      [   | E  |     ]
+   *          |    |
+   *         OWM  IWM
+   * </pre>
+   * The element is locally late, however we can still treat this case as for 'Normal' above
+   * since the IWM has not yet passed the end of the window and the element is ahead of the
+   * OWM. In effect, we get to 'launder' the locally late element and consider it as locally
+   * on-time because no downstream computation can observe the difference.
+   *
+   * <li>(Maybe late 1)
+   * <pre>
+   *          |            |
+   *      [   | E        ] |
+   *          |            |
+   *         OWM          IWM
+   * </pre>
+   * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME}
+   * pane may have already been emitted. However, if timer firings have been delayed then it
+   * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element
+   * hold since we can't be sure if it will be cleared promptly. Thus this element *may* find
+   * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's
+   * timestamp. We may however set a garbage collection hold if required.
+   *
+   * <li>(Maybe late 2)
+   * <pre>
+   *               |   |
+   *      [     E  |   | ]
+   *               |   |
+   *              OWM IWM
+   * </pre>
+   * The end-of-window timer has not yet fired, so this element may still appear in an
+   * {@code ON_TIME} pane. However the element is too late to contribute to the output
+   * watermark hold, and thus won't contribute to the pane's timestamp. We can still place an
+   * end-of-window hold.
+   *
+   * <li>(Maybe late 3)
+   * <pre>
+   *               |       |
+   *      [     E  |     ] |
+   *               |       |
+   *              OWM     IWM
+   * </pre>
+   * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer
+   * has already fired, or it is about to fire. We can place only the garbage collection hold,
+   * if required.
+   *
+   * <li>(Definitely late)
+   * <pre>
+   *                       |   |
+   *      [     E        ] |   |
+   *                       |   |
+   *                      OWM IWM
+   * </pre>
+   * The element is definitely too late to make an {@code ON_TIME} pane. We are too late to
+   * place an end-of-window hold. We can still place a garbage collection hold if required.
+   *
+   * </ol>
+   */
+  @Nullable
+  public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
+    Instant hold = addElementHold(context);
+    if (hold == null) {
+      hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/);
+    }
+    return hold;
+  }
+
+  /**
+   * Return {@code timestamp}, possibly shifted forward in time according to the window
+   * strategy's output time function.
+   */
+  private Instant shift(Instant timestamp, W window) {
+    Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
+    checkState(!shifted.isBefore(timestamp),
+        "OutputTimeFn moved element from %s to earlier time %s for window %s",
+        timestamp, shifted, window);
+    checkState(timestamp.isAfter(window.maxTimestamp())
+            || !shifted.isAfter(window.maxTimestamp()),
+        "OutputTimeFn moved element from %s to %s which is beyond end of "
+            + "window %s",
+        timestamp, shifted, window);
+
+    return shifted;
+  }
+
+  /**
+   * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
+   * added (ie the element timestamp plus any forward shift requested by the
+   * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
+   * The hold is only added if both:
+   * <ol>
+   * <li>The backend will be able to respect it. In other words the output watermark cannot
+   * be ahead of the proposed hold time.
+   * <li>A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end of the
+   * window. In other words the input watermark cannot be ahead of the end of the window.
+   * </ol>
+   * The hold ensures the pane which incorporates the element is will not be considered late by
+   * any downstream computation when it is eventually emitted.
+   */
+  @Nullable
+  private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
+    // Give the window function a chance to move the hold timestamp forward to encourage progress.
+    // (A later hold implies less impediment to the output watermark making progress, which in
+    // turn encourages end-of-window triggers to fire earlier in following computations.)
+    Instant elementHold = shift(context.timestamp(), context.window());
+
+    Instant outputWM = timerInternals.currentOutputWatermarkTime();
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+    String which;
+    boolean tooLate;
+    // TODO: These case labels could be tightened.
+    // See the case analysis in addHolds above for the motivation.
+    if (outputWM != null && elementHold.isBefore(outputWM)) {
+      which = "too late to effect output watermark";
+      tooLate = true;
+    } else if (context.window().maxTimestamp().isBefore(inputWM)) {
+      which = "too late for end-of-window timer";
+      tooLate = true;
+    } else {
+      which = "on time";
+      tooLate = false;
+      checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+          "Element hold %s is beyond end-of-time", elementHold);
+      context.state().access(elementHoldTag).add(elementHold);
+    }
+    WindowTracing.trace(
+        "WatermarkHold.addHolds: element hold at {} is {} for "
+        + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+        elementHold, which, context.key(), context.window(), inputWM,
+        outputWM);
+
+    return tooLate ? null : elementHold;
+  }
+
+  /**
+   * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required).
+   * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added.
+   */
+  @Nullable
+  private Instant addEndOfWindowOrGarbageCollectionHolds(
+      ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
+    Instant hold = addEndOfWindowHold(context, paneIsEmpty);
+    if (hold == null) {
+      hold = addGarbageCollectionHold(context, paneIsEmpty);
+    }
+    return hold;
+  }
+
+  /**
+   * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added
+   * (ie the end of window time), or {@literal null} if no end of window hold is possible and we
+   * should fallback to a garbage collection hold.
+   *
+   * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner})
+   * to clear it. In other words, the input watermark cannot be ahead of the end of window time.
+   *
+   * <p>An end-of-window hold is added in two situations:
+   * <ol>
+   * <li>An incoming element came in behind the output watermark (so we are too late for placing
+   * the usual element hold), but it may still be possible to include the element in an
+   * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will
+   * not be considered late by any downstream computation.
+   * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at
+   * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in
+   * a pane are processed due to a fired trigger we must set both an end of window timer and an end
+   * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered
+   * late by any downstream computation.
+   * </ol>
+   */
+  @Nullable
+  private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
+    Instant outputWM = timerInternals.currentOutputWatermarkTime();
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Instant eowHold = context.window().maxTimestamp();
+
+    if (eowHold.isBefore(inputWM)) {
+      WindowTracing.trace(
+          "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for "
+              + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+          eowHold, context.key(), context.window(), inputWM, outputWM);
+      return null;
+    }
+
+    checkState(outputWM == null || !eowHold.isBefore(outputWM),
+        "End-of-window hold %s cannot be before output watermark %s",
+        eowHold, outputWM);
+    checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+        "End-of-window hold %s is beyond end-of-time", eowHold);
+    // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
+    // the hold away from the combining function in elementHoldTag.
+    // However if !paneIsEmpty then it could make sense  to use the elementHoldTag here.
+    // Alas, onMerge is forced to add an end of window or garbage collection hold without
+    // knowing whether an element hold is already in place (stopping to check is too expensive).
+    // This it would end up adding an element hold at the end of the window which could
+    // upset the elementHoldTag combining function.
+    context.state().access(EXTRA_HOLD_TAG).add(eowHold);
+    WindowTracing.trace(
+        "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for "
+            + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+        eowHold, context.key(), context.window(), inputWM, outputWM);
+    return eowHold;
+  }
+
+  /**
+   * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at
+   * which the hold was added (ie the end of window time plus allowed lateness),
+   * or {@literal null} if no hold was added.
+   *
+   * <p>We only add the hold if it is distinct from what would be added by
+   * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness}
+   * must be non-zero.
+   *
+   * <p>A garbage collection hold is added in two situations:
+   * <ol>
+   * <li>An incoming element came in behind the output watermark, and was too late for placing
+   * the usual element hold or an end of window hold. Place the garbage collection hold so that
+   * we can guarantee when the pane is finally triggered its output will not be dropped due to
+   * excessive lateness by any downstream computation.
+   * <li>The {@link WindowingStrategy#getClosingBehavior()} is
+   * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
+   * for all windows which saw at least one element. Again, the garbage collection hold guarantees
+   * that any empty final pane can be given a timestamp which will not be considered beyond
+   * allowed lateness by any downstream computation.
+   * </ol>
+   *
+   * <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2.
+   */
+  @Nullable
+  private Instant addGarbageCollectionHold(
+      ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
+    Instant outputWM = timerInternals.currentOutputWatermarkTime();
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Instant eow = context.window().maxTimestamp();
+    Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness());
+
+    if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
+      WindowTracing.trace(
+          "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
+              + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; "
+              + "outputWatermark:{}",
+          gcHold, context.key(), context.window(), inputWM, outputWM);
+      return null;
+    }
+
+    if (paneIsEmpty && context.windowingStrategy().getClosingBehavior()
+        == ClosingBehavior.FIRE_IF_NON_EMPTY) {
+      WindowTracing.trace(
+          "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
+              + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; "
+              + "outputWatermark:{}",
+          gcHold, context.key(), context.window(), inputWM, outputWM);
+      return null;
+    }
+
+    checkState(!gcHold.isBefore(inputWM),
+        "Garbage collection hold %s cannot be before input watermark %s",
+        gcHold, inputWM);
+    checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+        "Garbage collection hold %s is beyond end-of-time", gcHold);
+    // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above.
+    context.state().access(EXTRA_HOLD_TAG).add(gcHold);
+
+    WindowTracing.trace(
+        "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for "
+            + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+        gcHold, context.key(), context.window(), inputWM, outputWM);
+    return gcHold;
+  }
+
+  /**
+   * Prefetch watermark holds in preparation for merging.
+   */
+  public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
+    StateMerging.prefetchWatermarks(state, elementHoldTag);
+  }
+
+  /**
+   * Updates the watermark hold when windows merge if it is possible the merged value does
+   * not equal all of the existing holds. For example, if the new window implies a later
+   * watermark hold, then earlier holds may be released.
+   */
+  public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
+    WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; "
+            + "outputWatermark:{}",
+        context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
+        timerInternals.currentOutputWatermarkTime());
+    StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window());
+    // If we had a cheap way to determine if we have an element hold then we could
+    // avoid adding an unnecessary end-of-window or garbage collection hold.
+    // Simply reading the above merged watermark would impose an additional read for the
+    // common case that the active window has just one underlying state address window and
+    // the hold depends on the min of the element timestamps.
+    // At least one merged window must be non-empty for the merge to have been triggered.
+    StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
+    addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/);
+  }
+
+  /**
+   * Result of {@link #extractAndRelease}.
+   */
+  public static class OldAndNewHolds {
+    public final Instant oldHold;
+    @Nullable
+    public final Instant newHold;
+
+    public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
+      this.oldHold = oldHold;
+      this.newHold = newHold;
+    }
+  }
+
+  /**
+   * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
+   * reading, but add/restore an end-of-window or garbage collection hold if required.
+   *
+   * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn}
+   * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
+   * elements in the current pane. If there is no such value the timestamp is the end
+   * of the window.
+   */
+  public ReadableState<OldAndNewHolds> extractAndRelease(
+      final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
+    WindowTracing.debug(
+        "WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; "
+            + "outputWatermark:{}",
+        context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
+        timerInternals.currentOutputWatermarkTime());
+    final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
+    final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
+    return new ReadableState<OldAndNewHolds>() {
+      @Override
+      @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+        justification = "")
+      public ReadableState<OldAndNewHolds> readLater() {
+        elementHoldState.readLater();
+        extraHoldState.readLater();
+        return this;
+      }
+
+      @Override
+      public OldAndNewHolds read() {
+        // Read both the element and extra holds.
+        Instant elementHold = elementHoldState.read();
+        Instant extraHold = extraHoldState.read();
+        Instant oldHold;
+        // Find the minimum, accounting for null.
+        if (elementHold == null) {
+          oldHold = extraHold;
+        } else if (extraHold == null) {
+          oldHold = elementHold;
+        } else if (elementHold.isBefore(extraHold)) {
+          oldHold = elementHold;
+        } else {
+          oldHold = extraHold;
+        }
+        if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
+          // If no hold (eg because all elements came in behind the output watermark), or
+          // the hold was for garbage collection, take the end of window as the result.
+          WindowTracing.debug(
+              "WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
+              + "for key:{}; window:{}",
+              oldHold, context.key(), context.window());
+          oldHold = context.window().maxTimestamp();
+        }
+        WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
+            context.key(), context.window());
+
+        // Clear the underlying state to allow the output watermark to progress.
+        elementHoldState.clear();
+        extraHoldState.clear();
+
+        @Nullable Instant newHold = null;
+        if (!isFinished) {
+          // Only need to leave behind an end-of-window or garbage collection hold
+          // if future elements will be processed.
+          newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/);
+        }
+
+        return new OldAndNewHolds(oldHold, newHold);
+      }
+    };
+  }
+
+  /**
+   * Clear any remaining holds.
+   */
+  public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
+    WindowTracing.debug(
+        "WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+        context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
+        timerInternals.currentOutputWatermarkTime());
+    context.state().access(elementHoldTag).clear();
+    context.state().access(EXTRA_HOLD_TAG).clear();
+  }
+
+  /**
+   * Return the current data hold, or null if none. Does not clear. For debugging only.
+   */
+  @Nullable
+  public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
+    return context.state().access(elementHoldTag).read();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
deleted file mode 100644
index af28052..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
- * {@link PCollection} to windows according to the provided {@link WindowFn}.
- *
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-public class AssignWindows<T, W extends BoundedWindow>
-    extends PTransform<PCollection<T>, PCollection<T>> {
-
-  private WindowFn<? super T, W> fn;
-
-  public AssignWindows(WindowFn<? super T, W> fn) {
-    this.fn = fn;
-  }
-
-  @Override
-  public PCollection<T> apply(PCollection<T> input) {
-    return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
deleted file mode 100644
index 7e26253..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
- * provided {@link WindowFn}.
- *
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-@SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
-    implements RequiresWindowAccess {
-  private WindowFn<? super T, W> fn;
-
-  public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
-    this.fn =
-        checkNotNull(
-            fn,
-            "%s provided to %s cannot be null",
-            WindowFn.class.getSimpleName(),
-            AssignWindowsDoFn.class.getSimpleName());
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void processElement(final ProcessContext c) throws Exception {
-    Collection<W> windows =
-        ((WindowFn<T, W>) fn).assignWindows(
-            ((WindowFn<T, W>) fn).new AssignContext() {
-                @Override
-                public T element() {
-                  return c.element();
-                }
-
-                @Override
-                public Instant timestamp() {
-                  return c.timestamp();
-                }
-
-                @Override
-                public BoundedWindow window() {
-                  return Iterables.getOnlyElement(c.windowingInternals().windows());
-                }
-              });
-
-    c.windowingInternals()
-        .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
deleted file mode 100644
index f3e84a6..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-
-/**
- * TimerInternals that uses priority queues to manage the timers that are ready to fire.
- */
-public class BatchTimerInternals implements TimerInternals {
-  /** Set of timers that are scheduled used for deduplicating timers. */
-  private Set<TimerData> existingTimers = new HashSet<>();
-
-  // Keep these queues separate so we can advance over them separately.
-  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
-  private Instant inputWatermarkTime;
-  private Instant processingTime;
-
-  private PriorityQueue<TimerData> queue(TimeDomain domain) {
-    return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers;
-  }
-
-  public BatchTimerInternals(Instant processingTime) {
-    this.processingTime = processingTime;
-    this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-  }
-
-  @Override
-  public void setTimer(TimerData timer) {
-    if (existingTimers.add(timer)) {
-      queue(timer.getDomain()).add(timer);
-    }
-  }
-
-  @Override
-  public void deleteTimer(TimerData timer) {
-    existingTimers.remove(timer);
-    queue(timer.getDomain()).remove(timer);
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return processingTime;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing
-   * is already complete.
-   */
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return inputWatermarkTime;
-  }
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    // The output watermark is always undefined in batch mode.
-    return null;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .add("watermarkTimers", watermarkTimers)
-        .add("processingTimers", processingTimers)
-        .toString();
-  }
-
-  public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark)
-      throws Exception {
-    checkState(!newInputWatermark.isBefore(inputWatermarkTime),
-        "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
-        newInputWatermark);
-    inputWatermarkTime = newInputWatermark;
-    advance(runner, newInputWatermark, TimeDomain.EVENT_TIME);
-  }
-
-  public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime)
-      throws Exception {
-    checkState(!newProcessingTime.isBefore(processingTime),
-        "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
-    processingTime = newProcessingTime;
-    advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
-  }
-
-  private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain)
-      throws Exception {
-    PriorityQueue<TimerData> timers = queue(domain);
-    boolean shouldFire = false;
-
-    do {
-      TimerData timer = timers.peek();
-      // Timers fire if the new time is ahead of the timer
-      shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
-      if (shouldFire) {
-        // Remove before firing, so that if the trigger adds another identical
-        // timer we don't remove it.
-        timers.remove();
-        runner.onTimer(timer);
-      }
-    } while (shouldFire);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
deleted file mode 100644
index 49206d1..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * An wrapper interface that represents the execution of a {@link OldDoFn}.
- */
-public interface DoFnRunner<InputT, OutputT> {
-  /**
-   * Prepares and calls {@link OldDoFn#startBundle}.
-   */
-  public void startBundle();
-
-  /**
-   * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} containing the current
-   * element.
-   */
-  public void processElement(WindowedValue<InputT> elem);
-
-  /**
-   * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as
-   * flushing in-memory states.
-   */
-  public void finishBundle();
-
-  /**
-   * An internal interface for signaling that a {@link OldDoFn} requires late data dropping.
-   */
-  public interface ReduceFnExecutor<K, InputT, OutputT, W> {
-    /**
-     * Gets this object as a {@link OldDoFn}.
-     *
-     * Most implementors of this interface are expected to be {@link OldDoFn} instances, and will
-     * return themselves.
-     */
-    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
-
-    /**
-     * Returns an aggregator that tracks elements that are dropped due to being late.
-     */
-    Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
deleted file mode 100644
index f0cfd74..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ /dev/null
@@ -1,551 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-
-/**
- * A base implementation of {@link DoFnRunner}.
- *
- * <p> Sub-classes should override {@link #invokeProcessElement}.
- */
-public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
-  /** The {@link OldDoFn} being run. */
-  public final OldDoFn<InputT, OutputT> fn;
-
-  /** The context used for running the {@link OldDoFn}. */
-  public final DoFnContext<InputT, OutputT> context;
-
-  protected DoFnRunnerBase(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    this.fn = fn;
-    this.context = new DoFnContext<>(
-        options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy == null ? null : windowingStrategy.getWindowFn());
-  }
-
-  /**
-   * An implementation of {@code OutputManager} using simple lists, for testing and in-memory
-   * contexts such as the {@code DirectRunner}.
-   */
-  public static class ListOutputManager implements OutputManager {
-
-    private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
-
-      if (outputList == null) {
-        outputList = Lists.newArrayList();
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        List<WindowedValue<?>> untypedList = (List) outputList;
-        outputLists.put(tag, untypedList);
-      }
-
-      outputList.add(output);
-    }
-
-    public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
-      // Safe cast by design, inexpressible in Java without rawtypes
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
-      return (outputList != null) ? outputList : Collections.<WindowedValue<T>>emptyList();
-    }
-  }
-
-  @Override
-  public void startBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.startBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    if (elem.getWindows().size() <= 1
-        || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
-            && context.sideInputReader.isEmpty())) {
-      invokeProcessElement(elem);
-    } else {
-      // We could modify the windowed value (and the processContext) to
-      // avoid repeated allocations, but this is more straightforward.
-      for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
-        invokeProcessElement(windowedValue);
-      }
-    }
-  }
-
-  /**
-   * Invokes {@link OldDoFn#processElement} after certain pre-processings has been done in
-   * {@link DoFnRunnerBase#processElement}.
-   */
-  protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
-
-  @Override
-  public void finishBundle() {
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.finishBundle(context);
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
-    }
-  }
-
-  /**
-   * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
-   *
-   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
-   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
-   */
-  private static class DoFnContext<InputT, OutputT>
-      extends OldDoFn<InputT, OutputT>.Context {
-    private static final int MAX_SIDE_OUTPUTS = 1000;
-
-    final PipelineOptions options;
-    final OldDoFn<InputT, OutputT> fn;
-    final SideInputReader sideInputReader;
-    final OutputManager outputManager;
-    final TupleTag<OutputT> mainOutputTag;
-    final StepContext stepContext;
-    final AggregatorFactory aggregatorFactory;
-    final WindowFn<?, ?> windowFn;
-
-    /**
-     * The set of known output tags, some of which may be undeclared, so we can throw an
-     * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
-     */
-    private Set<TupleTag<?>> outputTags;
-
-    public DoFnContext(PipelineOptions options,
-                       OldDoFn<InputT, OutputT> fn,
-                       SideInputReader sideInputReader,
-                       OutputManager outputManager,
-                       TupleTag<OutputT> mainOutputTag,
-                       List<TupleTag<?>> sideOutputTags,
-                       StepContext stepContext,
-                       AggregatorFactory aggregatorFactory,
-                       WindowFn<?, ?> windowFn) {
-      fn.super();
-      this.options = options;
-      this.fn = fn;
-      this.sideInputReader = sideInputReader;
-      this.outputManager = outputManager;
-      this.mainOutputTag = mainOutputTag;
-      this.outputTags = Sets.newHashSet();
-
-      outputTags.add(mainOutputTag);
-      for (TupleTag<?> sideOutputTag : sideOutputTags) {
-        outputTags.add(sideOutputTag);
-      }
-
-      this.stepContext = stepContext;
-      this.aggregatorFactory = aggregatorFactory;
-      this.windowFn = windowFn;
-      super.setupDelegateAggregators();
-    }
-
-    //////////////////////////////////////////////////////////////////////////////
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
-        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
-      final Instant inputTimestamp = timestamp;
-
-      if (timestamp == null) {
-        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      if (windows == null) {
-        try {
-          // The windowFn can never succeed at accessing the element, so its type does not
-          // matter here
-          @SuppressWarnings("unchecked")
-          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
-          windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
-            @Override
-            public Object element() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input element when none was available");
-            }
-
-            @Override
-            public Instant timestamp() {
-              if (inputTimestamp == null) {
-                throw new UnsupportedOperationException(
-                    "WindowFn attempted to access input timestamp when none was available");
-              }
-              return inputTimestamp;
-            }
-
-            @Override
-            public W window() {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input windows when none were available");
-            }
-          });
-        } catch (Exception e) {
-          throw UserCodeException.wrap(e);
-        }
-      }
-
-      return WindowedValue.of(output, timestamp, windows, pane);
-    }
-
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-      if (!sideInputReader.contains(view)) {
-        throw new IllegalArgumentException("calling sideInput() with unknown view");
-      }
-      BoundedWindow sideInputWindow =
-          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
-      return sideInputReader.get(view, sideInputWindow);
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
-      outputManager.output(mainOutputTag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteOutput(windowedElem);
-      }
-    }
-
-    protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
-                                               T output,
-                                               Instant timestamp,
-                                               Collection<? extends BoundedWindow> windows,
-                                               PaneInfo pane) {
-      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
-      if (!outputTags.contains(tag)) {
-        // This tag wasn't declared nor was it seen before during this execution.
-        // Thus, this must be a new, undeclared and unconsumed output.
-        // To prevent likely user errors, enforce the limit on the number of side
-        // outputs.
-        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
-          throw new IllegalArgumentException(
-              "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
-        }
-        outputTags.add(tag);
-      }
-
-      outputManager.output(tag, windowedElem);
-      if (stepContext != null) {
-        stepContext.noteSideOutput(tag, windowedElem);
-      }
-    }
-
-    // Following implementations of output, outputWithTimestamp, and sideOutput
-    // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
-    // ProcessContext's versions in OldDoFn.processElement.
-    @Override
-    public void output(OutputT output) {
-      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
-      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
-      sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
-      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
-    }
-  }
-
-  /**
-   * Returns a new {@link OldDoFn.ProcessContext} for the given element.
-   */
-  protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
-      WindowedValue<InputT> elem) {
-    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
-  }
-
-  protected RuntimeException wrapUserCodeException(Throwable t) {
-    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
-  }
-
-  private boolean isSystemDoFn() {
-    return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
-  }
-
-  /**
-   * A concrete implementation of {@link OldDoFn.ProcessContext} used for
-   * running a {@link OldDoFn} over a single element.
-   *
-   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
-   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
-   */
-  static class DoFnProcessContext<InputT, OutputT>
-      extends OldDoFn<InputT, OutputT>.ProcessContext {
-
-
-    final OldDoFn<InputT, OutputT> fn;
-    final DoFnContext<InputT, OutputT> context;
-    final WindowedValue<InputT> windowedValue;
-
-    public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
-                              DoFnContext<InputT, OutputT> context,
-                              WindowedValue<InputT> windowedValue) {
-      fn.super();
-      this.fn = fn;
-      this.context = context;
-      this.windowedValue = windowedValue;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public InputT element() {
-      return windowedValue.getValue();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      checkNotNull(view, "View passed to sideInput cannot be null");
-      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
-      BoundedWindow window;
-      if (!windowIter.hasNext()) {
-        if (context.windowFn instanceof GlobalWindows) {
-          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
-          // without windows
-          window = GlobalWindow.INSTANCE;
-        } else {
-          throw new IllegalStateException(
-              "sideInput called when main input element is not in any windows");
-        }
-      } else {
-        window = windowIter.next();
-        if (windowIter.hasNext()) {
-          throw new IllegalStateException(
-              "sideInput called when main input element is in multiple windows");
-        }
-      }
-      return context.sideInput(view, window);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      if (!(fn instanceof RequiresWindowAccess)) {
-        throw new UnsupportedOperationException(
-            "window() is only available in the context of a OldDoFn marked as"
-                + "RequiresWindowAccess.");
-      }
-      return Iterables.getOnlyElement(windows());
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return windowedValue.getPane();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.outputWindowedValue(windowedValue.withValue(output));
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      checkTimestamp(timestamp);
-      context.outputWindowedValue(output, timestamp,
-          windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      context.outputWindowedValue(output, timestamp, windows, pane);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      checkNotNull(tag, "Tag passed to sideOutput cannot be null");
-      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
-      checkTimestamp(timestamp);
-      context.sideOutputWindowedValue(
-          tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
-    }
-
-    @Override
-    public Instant timestamp() {
-      return windowedValue.getTimestamp();
-    }
-
-    public Collection<? extends BoundedWindow> windows() {
-      return windowedValue.getWindows();
-    }
-
-    private void checkTimestamp(Instant timestamp) {
-      if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
-        throw new IllegalArgumentException(String.format(
-            "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
-            + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-            + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
-            timestamp, windowedValue.getTimestamp(),
-            PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
-      }
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public void outputWindowedValue(OutputT output, Instant timestamp,
-            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          context.outputWindowedValue(output, timestamp, windows, pane);
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return windowedValue.getWindows();
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return windowedValue.getPane();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return context.stepContext.timerInternals();
-        }
-
-        @Override
-        public <T> void writePCollectionViewData(
-            TupleTag<?> tag,
-            Iterable<WindowedValue<T>> data,
-            Coder<T> elemCoder) throws IOException {
-          @SuppressWarnings("unchecked")
-          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
-
-          context.stepContext.writePCollectionViewData(
-              tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
-              window(), windowCoder);
-        }
-
-        @Override
-        public StateInternals<?> stateInternals() {
-          return context.stepContext.stateInternals();
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          return context.sideInput(view, mainInputWindow);
-        }
-      };
-    }
-
-    @Override
-    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
-        createAggregatorInternal(
-            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
deleted file mode 100644
index c4df7b2..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.List;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Static utility methods that provide {@link DoFnRunner} implementations.
- */
-public class DoFnRunners {
-  /**
-   * Information about how to create output receivers and output to them.
-   */
-  public interface OutputManager {
-    /**
-     * Outputs a single element to the receiver indicated by the given {@link TupleTag}.
-     */
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output);
-  }
-
-  /**
-   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}.
-   *
-   * <p>It invokes {@link OldDoFn#processElement} for each input.
-   */
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return new SimpleDoFnRunner<>(
-        options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy);
-  }
-
-  /**
-   * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
-   *
-   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
-   */
-  public static <K, InputT, OutputT, W extends BoundedWindow>
-      DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
-          PipelineOptions options,
-          ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
-          SideInputReader sideInputReader,
-          OutputManager outputManager,
-          TupleTag<KV<K, OutputT>> mainOutputTag,
-          List<TupleTag<?>> sideOutputTags,
-          StepContext stepContext,
-          AggregatorFactory aggregatorFactory,
-          WindowingStrategy<?, W> windowingStrategy) {
-    DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
-        simpleRunner(
-            options,
-            reduceFnExecutor.asDoFn(),
-            sideInputReader,
-            outputManager,
-            mainOutputTag,
-            sideOutputTags,
-            stepContext,
-            aggregatorFactory,
-            windowingStrategy);
-    return new LateDataDroppingDoFnRunner<>(
-        simpleDoFnRunner,
-        windowingStrategy,
-        stepContext.timerInternals(),
-        reduceFnExecutor.getDroppedDueToLatenessAggregator());
-  }
-
-
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AggregatorFactory aggregatorFactory,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    if (doFn instanceof ReduceFnExecutor) {
-      @SuppressWarnings("rawtypes")
-      ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
-      @SuppressWarnings({"unchecked", "cast", "rawtypes"})
-      DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
-          options,
-          fn,
-          sideInputReader,
-          outputManager,
-          (TupleTag) mainOutputTag,
-          sideOutputTags,
-          stepContext,
-          aggregatorFactory,
-          (WindowingStrategy) windowingStrategy);
-      return runner;
-    }
-    return simpleRunner(
-        options,
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        aggregatorFactory,
-        windowingStrategy);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
deleted file mode 100644
index f386dfb..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
- * combining values.
- *
- * @param <K> key type
- * @param <InputT> input value element type
- * @param <OutputT> output value element type
- * @param <W> window type
- */
-@SystemDoFnInternal
-public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
-    extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
-  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
-  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
-
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
-
-  /**
-   * Create the default {@link GroupAlsoByWindowsDoFn}, which uses window sets to implement the
-   * grouping.
-   *
-   * @param windowingStrategy The window function and trigger to use for grouping
-   * @param inputCoder the input coder to use
-   */
-  public static <K, V, W extends BoundedWindow>
-      GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createDefault(
-          WindowingStrategy<?, W> windowingStrategy,
-          StateInternalsFactory<K> stateInternalsFactory,
-          Coder<V> inputCoder) {
-    return new GroupAlsoByWindowsViaOutputBufferDoFn<>(
-        windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, V, W>buffering(inputCoder));
-  }
-}


Mime
View raw message