beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [03/12] incubator-beam git commit: Move some easy stuff into runners/core-java
Date Wed, 22 Jun 2016 02:22:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
deleted file mode 100644
index 14ec082..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
+++ /dev/null
@@ -1,536 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-
-import javax.annotation.Nullable;
-
-/**
- * Implements the logic to hold the output watermark for a computation back
- * until it has seen all the elements it needs based on the input watermark for the
- * computation.
- *
- * <p>The backend ensures the output watermark can never progress beyond the
- * input watermark for a computation. GroupAlsoByWindows computations may add a 'hold'
- * to the output watermark in order to prevent it progressing beyond a time within a window.
- * The hold will be 'cleared' when the associated pane is emitted.
- *
- * <p>This class is only intended for use by {@link ReduceFnRunner}. The two evolve together and
- * will likely break any other uses.
- *
- * @param <W> The kind of {@link BoundedWindow} the hold is for.
- */
-class WatermarkHold<W extends BoundedWindow> implements Serializable {
-  /**
-   * Return tag for state containing the output watermark hold
-   * used for elements.
-   */
-  public static <W extends BoundedWindow>
-      StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(
-          OutputTimeFn<? super W> outputTimeFn) {
-    return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal(
-        StateTags.<W>watermarkStateInternal("hold", outputTimeFn));
-  }
-
-  /**
-   * Tag for state containing end-of-window and garbage collection output watermark holds.
-   * (We can't piggy-back on the data hold state since the outputTimeFn may be
-   * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will
-   * would take the end-of-window time as its element time.)
-   */
-  @VisibleForTesting
-  public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG =
-      StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
-          "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
-
-  private final TimerInternals timerInternals;
-  private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;
-
-  public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
-    this.timerInternals = timerInternals;
-    this.windowingStrategy = windowingStrategy;
-    this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
-  }
-
-  /**
-   * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
-   * of the element in {@code context}. We allow the actual hold time to be shifted later by
-   * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will
-   * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
-   * was placed, or {@literal null} if no hold was placed.
-   *
-   * <p>In the following we'll write {@code E} to represent an element's timestamp after passing
-   * through the window strategy's output time function, {@code IWM} for the local input watermark,
-   * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection
-   * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right,
-   * and we write {@code [ ... ]} to denote a bounded window with implied lower bound.
-   *
-   * <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness}
-   * is {@code ZERO}.
-   *
-   * <p>Here are the cases we need to handle. They are conceptually considered in the
-   * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM.
-   * <ol>
-   * <li>(Normal)
-   * <pre>
-   *          |
-   *      [   | E        ]
-   *          |
-   *         IWM
-   * </pre>
-   * This is, hopefully, the common and happy case. The element is locally on-time and can
-   * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer
-   * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's
-   * timestamp (depending on the output time function). Thus the OWM will not proceed past E
-   * until the next pane fires.
-   *
-   * <li>(Discard - no target window)
-   * <pre>
-   *                       |                            |
-   *      [     E        ] |                            |
-   *                       |                            |
-   *                     GCWM  <-getAllowedLateness->  IWM
-   * </pre>
-   * The element is very locally late. The window has been garbage collected, thus there
-   * is no target pane E could be assigned to. We discard E.
-   *
-   * <li>(Unobservably late)
-   * <pre>
-   *          |    |
-   *      [   | E  |     ]
-   *          |    |
-   *         OWM  IWM
-   * </pre>
-   * The element is locally late, however we can still treat this case as for 'Normal' above
-   * since the IWM has not yet passed the end of the window and the element is ahead of the
-   * OWM. In effect, we get to 'launder' the locally late element and consider it as locally
-   * on-time because no downstream computation can observe the difference.
-   *
-   * <li>(Maybe late 1)
-   * <pre>
-   *          |            |
-   *      [   | E        ] |
-   *          |            |
-   *         OWM          IWM
-   * </pre>
-   * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME}
-   * pane may have already been emitted. However, if timer firings have been delayed then it
-   * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element
-   * hold since we can't be sure if it will be cleared promptly. Thus this element *may* find
-   * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's
-   * timestamp. We may however set a garbage collection hold if required.
-   *
-   * <li>(Maybe late 2)
-   * <pre>
-   *               |   |
-   *      [     E  |   | ]
-   *               |   |
-   *              OWM IWM
-   * </pre>
-   * The end-of-window timer has not yet fired, so this element may still appear in an
-   * {@code ON_TIME} pane. However the element is too late to contribute to the output
-   * watermark hold, and thus won't contribute to the pane's timestamp. We can still place an
-   * end-of-window hold.
-   *
-   * <li>(Maybe late 3)
-   * <pre>
-   *               |       |
-   *      [     E  |     ] |
-   *               |       |
-   *              OWM     IWM
-   * </pre>
-   * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer
-   * has already fired, or it is about to fire. We can place only the garbage collection hold,
-   * if required.
-   *
-   * <li>(Definitely late)
-   * <pre>
-   *                       |   |
-   *      [     E        ] |   |
-   *                       |   |
-   *                      OWM IWM
-   * </pre>
-   * The element is definitely too late to make an {@code ON_TIME} pane. We are too late to
-   * place an end-of-window hold. We can still place a garbage collection hold if required.
-   *
-   * </ol>
-   */
-  @Nullable
-  public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
-    Instant hold = addElementHold(context);
-    if (hold == null) {
-      hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/);
-    }
-    return hold;
-  }
-
-  /**
-   * Return {@code timestamp}, possibly shifted forward in time according to the window
-   * strategy's output time function.
-   */
-  private Instant shift(Instant timestamp, W window) {
-    Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
-    Preconditions.checkState(!shifted.isBefore(timestamp),
-        "OutputTimeFn moved element from %s to earlier time %s for window %s",
-        timestamp, shifted, window);
-    Preconditions.checkState(timestamp.isAfter(window.maxTimestamp())
-            || !shifted.isAfter(window.maxTimestamp()),
-        "OutputTimeFn moved element from %s to %s which is beyond end of "
-            + "window %s",
-        timestamp, shifted, window);
-
-    return shifted;
-  }
-
-  /**
-   * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
-   * added (ie the element timestamp plus any forward shift requested by the
-   * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
-   * The hold is only added if both:
-   * <ol>
-   * <li>The backend will be able to respect it. In other words the output watermark cannot
-   * be ahead of the proposed hold time.
-   * <li>A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end of the
-   * window. In other words the input watermark cannot be ahead of the end of the window.
-   * </ol>
-   * The hold ensures the pane which incorporates the element is will not be considered late by
-   * any downstream computation when it is eventually emitted.
-   */
-  @Nullable
-  private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
-    // Give the window function a chance to move the hold timestamp forward to encourage progress.
-    // (A later hold implies less impediment to the output watermark making progress, which in
-    // turn encourages end-of-window triggers to fire earlier in following computations.)
-    Instant elementHold = shift(context.timestamp(), context.window());
-
-    Instant outputWM = timerInternals.currentOutputWatermarkTime();
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-
-    String which;
-    boolean tooLate;
-    // TODO: These case labels could be tightened.
-    // See the case analysis in addHolds above for the motivation.
-    if (outputWM != null && elementHold.isBefore(outputWM)) {
-      which = "too late to effect output watermark";
-      tooLate = true;
-    } else if (context.window().maxTimestamp().isBefore(inputWM)) {
-      which = "too late for end-of-window timer";
-      tooLate = true;
-    } else {
-      which = "on time";
-      tooLate = false;
-      Preconditions.checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-                               "Element hold %s is beyond end-of-time", elementHold);
-      context.state().access(elementHoldTag).add(elementHold);
-    }
-    WindowTracing.trace(
-        "WatermarkHold.addHolds: element hold at {} is {} for "
-        + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-        elementHold, which, context.key(), context.window(), inputWM,
-        outputWM);
-
-    return tooLate ? null : elementHold;
-  }
-
-  /**
-   * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required).
-   * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added.
-   */
-  @Nullable
-  private Instant addEndOfWindowOrGarbageCollectionHolds(
-      ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
-    Instant hold = addEndOfWindowHold(context, paneIsEmpty);
-    if (hold == null) {
-      hold = addGarbageCollectionHold(context, paneIsEmpty);
-    }
-    return hold;
-  }
-
-  /**
-   * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added
-   * (ie the end of window time), or {@literal null} if no end of window hold is possible and we
-   * should fallback to a garbage collection hold.
-   *
-   * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner})
-   * to clear it. In other words, the input watermark cannot be ahead of the end of window time.
-   *
-   * <p>An end-of-window hold is added in two situations:
-   * <ol>
-   * <li>An incoming element came in behind the output watermark (so we are too late for placing
-   * the usual element hold), but it may still be possible to include the element in an
-   * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will
-   * not be considered late by any downstream computation.
-   * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at
-   * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in
-   * a pane are processed due to a fired trigger we must set both an end of window timer and an end
-   * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered
-   * late by any downstream computation.
-   * </ol>
-   */
-  @Nullable
-  private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
-    Instant outputWM = timerInternals.currentOutputWatermarkTime();
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Instant eowHold = context.window().maxTimestamp();
-
-    if (eowHold.isBefore(inputWM)) {
-      WindowTracing.trace(
-          "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for "
-              + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-          eowHold, context.key(), context.window(), inputWM, outputWM);
-      return null;
-    }
-
-    Preconditions.checkState(outputWM == null || !eowHold.isBefore(outputWM),
-        "End-of-window hold %s cannot be before output watermark %s",
-        eowHold, outputWM);
-    Preconditions.checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-        "End-of-window hold %s is beyond end-of-time", eowHold);
-    // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
-    // the hold away from the combining function in elementHoldTag.
-    // However if !paneIsEmpty then it could make sense  to use the elementHoldTag here.
-    // Alas, onMerge is forced to add an end of window or garbage collection hold without
-    // knowing whether an element hold is already in place (stopping to check is too expensive).
-    // This it would end up adding an element hold at the end of the window which could
-    // upset the elementHoldTag combining function.
-    context.state().access(EXTRA_HOLD_TAG).add(eowHold);
-    WindowTracing.trace(
-        "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for "
-            + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-        eowHold, context.key(), context.window(), inputWM, outputWM);
-    return eowHold;
-  }
-
-  /**
-   * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at
-   * which the hold was added (ie the end of window time plus allowed lateness),
-   * or {@literal null} if no hold was added.
-   *
-   * <p>We only add the hold if it is distinct from what would be added by
-   * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness}
-   * must be non-zero.
-   *
-   * <p>A garbage collection hold is added in two situations:
-   * <ol>
-   * <li>An incoming element came in behind the output watermark, and was too late for placing
-   * the usual element hold or an end of window hold. Place the garbage collection hold so that
-   * we can guarantee when the pane is finally triggered its output will not be dropped due to
-   * excessive lateness by any downstream computation.
-   * <li>The {@link WindowingStrategy#getClosingBehavior()} is
-   * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
-   * for all windows which saw at least one element. Again, the garbage collection hold guarantees
-   * that any empty final pane can be given a timestamp which will not be considered beyond
-   * allowed lateness by any downstream computation.
-   * </ol>
-   *
-   * <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2.
-   */
-  @Nullable
-  private Instant addGarbageCollectionHold(
-      ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
-    Instant outputWM = timerInternals.currentOutputWatermarkTime();
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Instant eow = context.window().maxTimestamp();
-    Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness());
-
-    if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
-      WindowTracing.trace(
-          "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
-              + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; "
-              + "outputWatermark:{}",
-          gcHold, context.key(), context.window(), inputWM, outputWM);
-      return null;
-    }
-
-    if (paneIsEmpty && context.windowingStrategy().getClosingBehavior()
-        == ClosingBehavior.FIRE_IF_NON_EMPTY) {
-      WindowTracing.trace(
-          "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
-              + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; "
-              + "outputWatermark:{}",
-          gcHold, context.key(), context.window(), inputWM, outputWM);
-      return null;
-    }
-
-    Preconditions.checkState(!gcHold.isBefore(inputWM),
-        "Garbage collection hold %s cannot be before input watermark %s",
-        gcHold, inputWM);
-    Preconditions.checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-        "Garbage collection hold %s is beyond end-of-time", gcHold);
-    // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above.
-    context.state().access(EXTRA_HOLD_TAG).add(gcHold);
-
-    WindowTracing.trace(
-        "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for "
-            + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-        gcHold, context.key(), context.window(), inputWM, outputWM);
-    return gcHold;
-  }
-
-  /**
-   * Prefetch watermark holds in preparation for merging.
-   */
-  public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
-    StateMerging.prefetchWatermarks(state, elementHoldTag);
-  }
-
-  /**
-   * Updates the watermark hold when windows merge if it is possible the merged value does
-   * not equal all of the existing holds. For example, if the new window implies a later
-   * watermark hold, then earlier holds may be released.
-   */
-  public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
-    WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; "
-            + "outputWatermark:{}",
-        context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
-        timerInternals.currentOutputWatermarkTime());
-    StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window());
-    // If we had a cheap way to determine if we have an element hold then we could
-    // avoid adding an unnecessary end-of-window or garbage collection hold.
-    // Simply reading the above merged watermark would impose an additional read for the
-    // common case that the active window has just one underlying state address window and
-    // the hold depends on the min of the element timestamps.
-    // At least one merged window must be non-empty for the merge to have been triggered.
-    StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
-    addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/);
-  }
-
-  /**
-   * Result of {@link #extractAndRelease}.
-   */
-  public static class OldAndNewHolds {
-    public final Instant oldHold;
-    @Nullable
-    public final Instant newHold;
-
-    public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
-      this.oldHold = oldHold;
-      this.newHold = newHold;
-    }
-  }
-
-  /**
-   * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
-   * reading, but add/restore an end-of-window or garbage collection hold if required.
-   *
-   * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn}
-   * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
-   * elements in the current pane. If there is no such value the timestamp is the end
-   * of the window.
-   */
-  public ReadableState<OldAndNewHolds> extractAndRelease(
-      final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
-    WindowTracing.debug(
-        "WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; "
-            + "outputWatermark:{}",
-        context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
-        timerInternals.currentOutputWatermarkTime());
-    final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
-    final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
-    return new ReadableState<OldAndNewHolds>() {
-      @Override
-      public ReadableState<OldAndNewHolds> readLater() {
-        elementHoldState.readLater();
-        extraHoldState.readLater();
-        return this;
-      }
-
-      @Override
-      public OldAndNewHolds read() {
-        // Read both the element and extra holds.
-        Instant elementHold = elementHoldState.read();
-        Instant extraHold = extraHoldState.read();
-        Instant oldHold;
-        // Find the minimum, accounting for null.
-        if (elementHold == null) {
-          oldHold = extraHold;
-        } else if (extraHold == null) {
-          oldHold = elementHold;
-        } else if (elementHold.isBefore(extraHold)) {
-          oldHold = elementHold;
-        } else {
-          oldHold = extraHold;
-        }
-        if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
-          // If no hold (eg because all elements came in behind the output watermark), or
-          // the hold was for garbage collection, take the end of window as the result.
-          WindowTracing.debug(
-              "WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
-              + "for key:{}; window:{}",
-              oldHold, context.key(), context.window());
-          oldHold = context.window().maxTimestamp();
-        }
-        WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
-            context.key(), context.window());
-
-        // Clear the underlying state to allow the output watermark to progress.
-        elementHoldState.clear();
-        extraHoldState.clear();
-
-        @Nullable Instant newHold = null;
-        if (!isFinished) {
-          // Only need to leave behind an end-of-window or garbage collection hold
-          // if future elements will be processed.
-          newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/);
-        }
-
-        return new OldAndNewHolds(oldHold, newHold);
-      }
-    };
-  }
-
-  /**
-   * Clear any remaining holds.
-   */
-  public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
-    WindowTracing.debug(
-        "WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
-        context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
-        timerInternals.currentOutputWatermarkTime());
-    context.state().access(elementHoldTag).clear();
-    context.state().access(EXTRA_HOLD_TAG).clear();
-  }
-
-  /**
-   * Return the current data hold, or null if none. Does not clear. For debugging only.
-   */
-  @Nullable
-  public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
-    return context.state().access(elementHoldTag).read();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
deleted file mode 100644
index 3e1528f..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaceForTest;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link BatchTimerInternals}.
- */
-@RunWith(JUnit4.class)
-public class BatchTimerInternalsTest {
-
-  private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
-
-  @Mock
-  private ReduceFnRunner<?, ?, ?, ?> mockRunner;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testFiringTimers() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(processingTime2);
-
-    underTest.advanceProcessingTime(mockRunner, new Instant(20));
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // Advancing just a little shouldn't refire
-    underTest.advanceProcessingTime(mockRunner, new Instant(21));
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // Adding the timer and advancing a little should refire
-    underTest.setTimer(processingTime1);
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    underTest.advanceProcessingTime(mockRunner, new Instant(21));
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // And advancing the rest of the way should still have the other timer
-    underTest.advanceProcessingTime(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(processingTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-
-  @Test
-  public void testTimerOrdering() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(watermarkTime1);
-    underTest.setTimer(processingTime2);
-    underTest.setTimer(watermarkTime2);
-
-    underTest.advanceInputWatermark(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(watermarkTime1);
-    Mockito.verify(mockRunner).onTimer(watermarkTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    underTest.advanceProcessingTime(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    Mockito.verify(mockRunner).onTimer(processingTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-
-  @Test
-  public void testDeduplicate() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData watermarkTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    underTest.setTimer(watermarkTime);
-    underTest.setTimer(watermarkTime);
-    underTest.setTimer(processingTime);
-    underTest.setTimer(processingTime);
-    underTest.advanceProcessingTime(mockRunner, new Instant(20));
-    underTest.advanceInputWatermark(mockRunner, new Instant(20));
-
-    Mockito.verify(mockRunner).onTimer(processingTime);
-    Mockito.verify(mockRunner).onTimer(watermarkTime);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
deleted file mode 100644
index f653f49..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * Properties of {@link GroupAlsoByWindowsDoFn}.
- *
- * <p>Some properties may not hold of some implementations, due to restrictions on the context
- * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
- * support merging windows.
- */
-public class GroupAlsoByWindowsProperties {
-
-  /**
-   * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide
-   * the appropriate windowing strategy under test.
-   */
-  public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
-    <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>
-    forStrategy(WindowingStrategy<?, W> strategy);
-  }
-
-  /**
-   * Tests that for empty input and the given {@link WindowingStrategy}, the provided GABW
-   * implementation produces no output.
-   *
-   * <p>The input type is deliberately left as a wildcard, since it is not relevant.
-   */
-  public static <K, InputT, OutputT> void emptyInputEmptyOutput(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW(
-        gabwFactory,
-        windowingStrategy,
-        (K) null, // key should never be used
-        Collections.<WindowedValue<InputT>>emptyList());
-
-    assertThat(result.peekOutputElements(), hasSize(0));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows.
-   */
-  public static void groupsElementsIntoFixedWindows(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them into sliding windows.
-   *
-   * <p>In the input here, each element occurs in multiple windows.
-   */
-  public static void groupsElementsIntoSlidingWindowsWithMinTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
-        SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(5),
-                Arrays.asList(window(-10, 10), window(0, 20)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(15),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(3));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
-    assertThat(item0.getValue().getValue(), contains("v1"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
-    assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(10)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item2 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
-    assertThat(item2.getValue().getValue(), contains("v2"));
-    assertThat(item2.getTimestamp(), equalTo(new Instant(20)));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups and combines them according to sliding windows.
-   *
-   * <p>In the input here, each element occurs in multiple windows.
-   */
-  public static void combinesElementsInSlidingWindows(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                1L,
-                new Instant(5),
-                Arrays.asList(window(-10, 10), window(0, 20)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                2L,
-                new Instant(15),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(18),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(3));
-
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
-    assertThat(item0.getValue().getKey(), equalTo("k"));
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L))));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5L)));
-
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
-    assertThat(item1.getValue().getKey(), equalTo("k"));
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L))));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(5L)));
-
-    TimestampedValue<KV<String, Long>> item2 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
-    assertThat(item2.getValue().getKey(), equalTo("k"));
-    assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L))));
-    assertThat(item2.getTimestamp(), equalTo(new Instant(15L)));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements that fall into overlapping
-   * windows that are not merged.
-   */
-  public static void groupsIntoOverlappingNonmergingWindows(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 5)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(4),
-                Arrays.asList(window(1, 5)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(4),
-                Arrays.asList(window(0, 5)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
-    assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5)));
-    assertThat(item1.getValue().getValue(), contains("v2"));
-    assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions.
-   */
-  public static void groupsElementsInMergedSessions(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
-   * session window correctly according to the provided {@link CombineFn}.
-   */
-  public static void combinesElementsPerSession(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
-
-    DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                1L,
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                2L,
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
-    assertThat(item0.getValue().getKey(), equalTo("k"));
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
-    assertThat(item1.getValue().getKey(), equalTo("k"));
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp
-   * according to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
-   */
-  public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp
-   * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
-   */
-  public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(2)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(13)));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions
-   * with output timestamps at the end of the merged window.
-   */
-  public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions
-   * with output timestamps at the end of the merged window.
-   */
-  public static void groupsElementsInMergedSessionsWithLatestTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
-
-    BoundedWindow unmergedWindow = window(15, 25);
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(unmergedWindow),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    BoundedWindow mergedWindow = window(0, 15);
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(15)));
-  }
-
-  /**
-   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
-   * session window correctly according to the provided {@link CombineFn}.
-   */
-  public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
-
-    BoundedWindow secondWindow = window(15, 25);
-    DoFnTester<?, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                1L,
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                2L,
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(15),
-                Arrays.asList(secondWindow),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    BoundedWindow firstResultWindow = window(0, 15);
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow));
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
-    assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp()));
-
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow));
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
-    assertThat(item1.getTimestamp(),
-        equalTo(secondWindow.maxTimestamp()));
-  }
-
-  @SafeVarargs
-  private static <K, InputT, OutputT, W extends BoundedWindow>
-  DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
-      WindowingStrategy<?, W> windowingStrategy,
-      K key,
-      WindowedValue<InputT>... values) throws Exception {
-    return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values));
-  }
-
-  private static <K, InputT, OutputT, W extends BoundedWindow>
-  DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
-      WindowingStrategy<?, W> windowingStrategy,
-      K key,
-      Collection<WindowedValue<InputT>> values) throws Exception {
-
-    TupleTag<KV<K, OutputT>> outputTag = new TupleTag<>();
-    DoFnRunnerBase.ListOutputManager outputManager = new DoFnRunnerBase.ListOutputManager();
-
-    DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester =
-        DoFnTester.of(gabwFactory.forStrategy(windowingStrategy));
-    tester.startBundle();
-    tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
-    tester.finishBundle();
-
-    // Sanity check for corruption
-    for (KV<K, OutputT> elem : tester.peekOutputElements()) {
-      assertThat(elem.getKey(), equalTo(key));
-    }
-
-    return tester;
-  }
-
-  private static BoundedWindow window(long start, long end) {
-    return new IntervalWindow(new Instant(start), new Instant(end));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
deleted file mode 100644
index 4ac6164..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link GroupAlsoByWindowsViaOutputBufferDoFn}.
- */
-@RunWith(JUnit4.class)
-public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
-
-  private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT>
-  implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> {
-
-    private final Coder<InputT> inputCoder;
-
-    public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) {
-      this.inputCoder = inputCoder;
-    }
-
-    @Override
-    public <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W>
-        forStrategy(WindowingStrategy<?, W> windowingStrategy) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
-          windowingStrategy,
-          SystemReduceFn.<K, InputT, W>buffering(inputCoder));
-    }
-  }
-
-  @Test
-  public void testEmptyInputEmptyOutput() throws Exception {
-    GroupAlsoByWindowsProperties.emptyInputEmptyOutput(
-        new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSlidingWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsIntoOverlappingNonmergingWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsIntoSessions() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessions(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
deleted file mode 100644
index d929d39..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.LateDataDroppingDoFnRunner.LateDataFilter;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.util.Arrays;
-
-/**
- * Unit tests for {@link LateDataDroppingDoFnRunner}.
- */
-@RunWith(JUnit4.class)
-public class LateDataDroppingDoFnRunnerTest {
-  private static final FixedWindows WINDOW_FN = FixedWindows.of(Duration.millis(10));
-
-  @Mock private TimerInternals mockTimerInternals;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testLateDataFilter() throws Exception {
-    when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(15L));
-
-    InMemoryLongSumAggregator droppedDueToLateness =
-        new InMemoryLongSumAggregator("droppedDueToLateness");
-    LateDataFilter lateDataFilter = new LateDataFilter(
-        WindowingStrategy.of(WINDOW_FN), mockTimerInternals, droppedDueToLateness);
-
-    Iterable<WindowedValue<Integer>> actual = lateDataFilter.filter(
-        "a",
-        ImmutableList.of(
-            createDatum(13, 13L),
-            createDatum(5, 5L), // late element, earlier than 4L.
-            createDatum(16, 16L),
-            createDatum(18, 18L)));
-
-    Iterable<WindowedValue<Integer>> expected =  ImmutableList.of(
-        createDatum(13, 13L),
-        createDatum(16, 16L),
-        createDatum(18, 18L));
-    assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
-    assertEquals(1, droppedDueToLateness.sum);
-  }
-
-  private <T> WindowedValue<T> createDatum(T element, long timestampMillis) {
-    Instant timestamp = new Instant(timestampMillis);
-    return WindowedValue.of(
-        element,
-        timestamp,
-        Arrays.asList(WINDOW_FN.assignWindow(timestamp)),
-        PaneInfo.NO_FIRING);
-  }
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
-    private final String name;
-    private long sum = 0;
-
-    public InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<Long, ?, Long> getCombineFn() {
-      return new Sum.SumLongFn();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
deleted file mode 100644
index 8885118..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests for {@link PushbackSideInputDoFnRunner}.
- */
-@RunWith(JUnit4.class)
-public class PushbackSideInputDoFnRunnerTest {
-  @Mock private ReadyCheckingSideInputReader reader;
-  private TestDoFnRunner<Integer, Integer> underlying;
-  private PCollectionView<Integer> singletonView;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    TestPipeline p = TestPipeline.create();
-    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
-    singletonView =
-        created
-            .apply(Window.into(new IdentitySideInputWindowFn()))
-            .apply(Sum.integersGlobally().asSingletonView());
-
-    underlying = new TestDoFnRunner<>();
-  }
-
-  private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
-      ImmutableList<PCollectionView<?>> views) {
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        PushbackSideInputDoFnRunner.create(underlying, views, reader);
-    runner.startBundle();
-    return runner;
-  }
-
-  @Test
-  public void startFinishBundleDelegates() {
-    PushbackSideInputDoFnRunner runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
-    assertThat(underlying.started, is(true));
-    assertThat(underlying.finished, is(false));
-    runner.finishBundle();
-    assertThat(underlying.finished, is(true));
-  }
-
-  @Test
-  public void processElementSideInputNotReady() {
-    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
-        .thenReturn(false);
-
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
-    WindowedValue<Integer> oneWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            new IntervalWindow(new Instant(-500L), new Instant(0L)),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    Iterable<WindowedValue<Integer>> oneWindowPushback =
-        runner.processElementInReadyWindows(oneWindow);
-    assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
-    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
-  }
-
-  @Test
-  public void processElementSideInputNotReadyMultipleWindows() {
-    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
-        .thenReturn(false);
-
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
-    WindowedValue<Integer> multiWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            ImmutableList.of(
-                new IntervalWindow(new Instant(-500L), new Instant(0L)),
-                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
-                GlobalWindow.INSTANCE),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    Iterable<WindowedValue<Integer>> multiWindowPushback =
-        runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
-    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
-  }
-
-  @Test
-  public void processElementSideInputNotReadySomeWindows() {
-    when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
-        .thenReturn(false);
-    when(
-            reader.isReady(
-                Mockito.eq(singletonView),
-                org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
-        .thenReturn(true);
-
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
-    IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
-    IntervalWindow bigWindow =
-        new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
-    WindowedValue<Integer> multiWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
-            PaneInfo.NO_FIRING);
-    Iterable<WindowedValue<Integer>> multiWindowPushback =
-        runner.processElementInReadyWindows(multiWindow);
-    assertThat(
-        multiWindowPushback,
-        containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
-    assertThat(underlying.inputElems,
-        containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
-            WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
-  }
-
-  @Test
-  public void processElementSideInputReadyAllWindows() {
-    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
-        .thenReturn(true);
-
-    ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
-    PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
-
-    WindowedValue<Integer> multiWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            ImmutableList.of(
-                new IntervalWindow(new Instant(-500L), new Instant(0L)),
-                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
-                GlobalWindow.INSTANCE),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    Iterable<WindowedValue<Integer>> multiWindowPushback =
-        runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, emptyIterable());
-    assertThat(underlying.inputElems,
-        containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
-  }
-
-  @Test
-  public void processElementNoSideInputs() {
-    PushbackSideInputDoFnRunner<Integer, Integer> runner =
-        createRunner(ImmutableList.<PCollectionView<?>>of());
-
-    WindowedValue<Integer> multiWindow =
-        WindowedValue.of(
-            2,
-            new Instant(-2),
-            ImmutableList.of(
-                new IntervalWindow(new Instant(-500L), new Instant(0L)),
-                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
-                GlobalWindow.INSTANCE),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    Iterable<WindowedValue<Integer>> multiWindowPushback =
-        runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, emptyIterable());
-    assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
-  }
-
-  private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-    List<WindowedValue<InputT>> inputElems;
-    private boolean started = false;
-    private boolean finished = false;
-
-    @Override
-    public void startBundle() {
-      started = true;
-      inputElems = new ArrayList<>();
-    }
-
-    @Override
-    public void processElement(WindowedValue<InputT> elem) {
-      inputElems.add(elem);
-    }
-
-    @Override
-    public void finishBundle() {
-      finished = true;
-    }
-  }
-}


Mime
View raw message