beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [09/17] incubator-beam git commit: Move triggers to runners-core
Date Thu, 13 Oct 2016 22:22:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
deleted file mode 100644
index a960aa4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.base.Joiner;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.joda.time.Instant;
-
-/**
- * {@code Trigger}s control when the elements for a specific key and window are output. As elements
- * arrive, they are put into one or more windows by a {@link Window} transform and its associated
- * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the
- * {@code Window}s contents should be output.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window}
- * for more information about how grouping with windows works.
- *
- * <p>The elements that are assigned to a window since the last time it was fired (or since the
- * window was created) are placed into the current window pane. Triggers are evaluated against the
- * elements as they are added. When the root trigger fires, the elements in the current pane will be
- * output. When the root trigger finishes (indicating it will never fire again), the window is
- * closed and any new elements assigned to that window are discarded.
- *
- * <p>Several predefined {@code Trigger}s are provided:
- * <ul>
- *   <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from
- *   either the end of the window or the arrival of the first element in a pane.
- *   <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed
- *   (typically since the first element in a pane).
- *   <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as
- *   the number of elements that have been assigned to the current pane.
- * </ul>
- *
- * <p>In addition, {@code Trigger}s can be combined in a variety of ways:
- * <ul>
- *   <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its
- *   argument finishes it gets reset and starts over. Can be combined with
- *   {@link Trigger#orFinally} to specify a condition that causes the repetition to stop.
- *   <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every)
- *   time that a trigger fires, and advancing to the next trigger in the sequence when it finishes.
- *   <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments
- *   fires. An {@link AfterFirst} trigger finishes after it fires once.
- *   <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments
- *   have fired at least once. An {@link AfterAll} trigger finishes after it fires once.
- * </ul>
- *
- * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one
- * of the following states:
- * <ul>
- *   <li> Never Existed - before the trigger has started executing, there is no state associated
- *   with it anywhere in the system. A trigger moves to the executing state as soon as it
- *   processes in the current pane.
- *   <li> Executing - while the trigger is receiving items and may fire. While it is in this state,
- *   it may persist book-keeping information to persisted state, set timers, etc.
- *   <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the
- *   system remembers only that it is finished. Entering this state causes us to discard any
- *   elements in the buffer for that window, as well.
- * </ul>
- *
- * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite
- * trigger could reset its sub-triggers.
- *
- * <p>Triggers should not build up any state internally since they may be recreated
- * between invocations of the callbacks. All important values should be persisted using
- * state before the callback returns.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public abstract class Trigger implements Serializable {
-
-  /**
-   * Interface for accessing information about the trigger being executed and other triggers in the
-   * same tree.
-   */
-  public interface TriggerInfo {
-
-    /**
-     * Returns true if the windowing strategy of the current {@code PCollection} is a merging
-     * WindowFn. If true, the trigger execution needs to keep enough information to support the
-     * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will
-     * never be called.
-     */
-    boolean isMerging();
-
-    /**
-     * Access the executable versions of the sub-triggers of the current trigger.
-     */
-    Iterable<ExecutableTrigger> subTriggers();
-
-    /**
-     * Access the executable version of the specified sub-trigger.
-     */
-    ExecutableTrigger subTrigger(int subtriggerIndex);
-
-    /**
-     * Returns true if the current trigger is marked finished.
-     */
-    boolean isFinished();
-
-    /**
-     * Return true if the given subtrigger is marked finished.
-     */
-    boolean isFinished(int subtriggerIndex);
-
-    /**
-     * Returns true if all the sub-triggers of the current trigger are marked finished.
-     */
-    boolean areAllSubtriggersFinished();
-
-    /**
-     * Returns an iterable over the unfinished sub-triggers of the current trigger.
-     */
-    Iterable<ExecutableTrigger> unfinishedSubTriggers();
-
-    /**
-     * Returns the first unfinished sub-trigger.
-     */
-    ExecutableTrigger firstUnfinishedSubTrigger();
-
-    /**
-     * Clears all keyed state for triggers in the current sub-tree and unsets all the associated
-     * finished bits.
-     */
-    void resetTree() throws Exception;
-
-    /**
-     * Sets the finished bit for the current trigger.
-     */
-    void setFinished(boolean finished);
-
-    /**
-     * Sets the finished bit for the given sub-trigger.
-     */
-    void setFinished(boolean finished, int subTriggerIndex);
-  }
-
-  /**
-   * Interact with properties of the trigger being executed, with extensions to deal with the
-   * merging windows.
-   */
-  public interface MergingTriggerInfo extends TriggerInfo {
-
-    /** Return true if the trigger is finished in any window being merged. */
-    public abstract boolean finishedInAnyMergingWindow();
-
-    /** Return true if the trigger is finished in all windows being merged. */
-    public abstract boolean finishedInAllMergingWindows();
-  }
-
-  /**
-   * Information accessible to all operational hooks in this {@code Trigger}.
-   *
-   * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and
-   * extended with additional information in other methods.
-   */
-  public abstract class TriggerContext {
-
-    /** Returns the interface for accessing trigger info. */
-    public abstract TriggerInfo trigger();
-
-    /** Returns the interface for accessing persistent state. */
-    public abstract StateAccessor<?> state();
-
-    /** The window that the current context is executing in. */
-    public abstract BoundedWindow window();
-
-    /** Create a sub-context for the given sub-trigger. */
-    public abstract TriggerContext forTrigger(ExecutableTrigger trigger);
-
-    /**
-     * Removes the timer set in this trigger context for the given {@link Instant}
-     * and {@link TimeDomain}.
-     */
-    public abstract void deleteTimer(Instant timestamp, TimeDomain domain);
-
-    /** The current processing time. */
-    public abstract Instant currentProcessingTime();
-
-    /** The current synchronized upstream processing time or {@code null} if unknown. */
-    @Nullable
-    public abstract Instant currentSynchronizedProcessingTime();
-
-    /** The current event time for the input or {@code null} if unknown. */
-    @Nullable
-    public abstract Instant currentEventTime();
-  }
-
-  /**
-   * Extended {@link TriggerContext} containing information accessible to the {@link #onElement}
-   * operational hook.
-   */
-  public abstract class OnElementContext extends TriggerContext {
-    /** The event timestamp of the element currently being processed. */
-    public abstract Instant eventTimestamp();
-
-    /**
-     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
-     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
-     *
-     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
-     * timer firings for a window will be received, but the implementation should choose to ignore
-     * those that are not applicable.
-     *
-     * @param timestamp the time at which the trigger should be re-evaluated
-     * @param domain the domain that the {@code timestamp} applies to
-     */
-    public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
-    /** Create an {@code OnElementContext} for executing the given trigger. */
-    @Override
-    public abstract OnElementContext forTrigger(ExecutableTrigger trigger);
-  }
-
-  /**
-   * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge}
-   * operational hook.
-   */
-  public abstract class OnMergeContext extends TriggerContext {
-    /**
-     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
-     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
-     *
-     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
-     * timer firings for a window will be received, but the implementation should choose to ignore
-     * those that are not applicable.
-     *
-     * @param timestamp the time at which the trigger should be re-evaluated
-     * @param domain the domain that the {@code timestamp} applies to
-     */
-    public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
-    /** Create an {@code OnMergeContext} for executing the given trigger. */
-    @Override
-    public abstract OnMergeContext forTrigger(ExecutableTrigger trigger);
-
-    @Override
-    public abstract MergingStateAccessor<?, ?> state();
-
-    @Override
-    public abstract MergingTriggerInfo trigger();
-  }
-
-  @Nullable
-  protected final List<Trigger> subTriggers;
-
-  protected Trigger(@Nullable List<Trigger> subTriggers) {
-    this.subTriggers = subTriggers;
-  }
-
-
-  /**
-   * Called every time an element is incorporated into a window.
-   */
-  public abstract void onElement(OnElementContext c) throws Exception;
-
-  /**
-   * Called immediately after windows have been merged.
-   *
-   * <p>Leaf triggers should update their state by inspecting their status and any state
-   * in the merging windows. Composite triggers should update their state by calling
-   * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic.
-   *
-   * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished;
-   * it is the responsibility of the trigger itself to record this fact. It is forbidden for
-   * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending
-   * elements that led to it being ready to fire.
-   *
-   * <p>The implementation does not need to clear out any state associated with the old windows.
-   */
-  public abstract void onMerge(OnMergeContext c) throws Exception;
-
-  /**
-   * Returns {@code true} if the current state of the trigger indicates that its condition
-   * is satisfied and it is ready to fire.
-   */
-  public abstract boolean shouldFire(TriggerContext context) throws Exception;
-
-  /**
-   * Adjusts the state of the trigger to be ready for the next pane. For example, a
-   * {@link Repeatedly} trigger will reset its inner trigger, since it has fired.
-   *
-   * <p>If the trigger is finished, it is the responsibility of the trigger itself to
-   * record that fact via the {@code context}.
-   */
-  public abstract void onFire(TriggerContext context) throws Exception;
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onElement} call.
-   */
-  public void prefetchOnElement(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnElement(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onMerge} call.
-   */
-  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnMerge(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #shouldFire} call.
-   */
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchShouldFire(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onFire} call.
-   */
-  public void prefetchOnFire(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnFire(state);
-      }
-    }
-  }
-
-  /**
-   * Clear any state associated with this trigger in the given window.
-   *
-   * <p>This is called after a trigger has indicated it will never fire again. The trigger system
-   * keeps enough information to know that the trigger is finished, so this trigger should clear all
-   * of its state.
-   */
-  public void clear(TriggerContext c) throws Exception {
-    if (subTriggers != null) {
-      for (ExecutableTrigger trigger : c.trigger().subTriggers()) {
-        trigger.invokeClear(c);
-      }
-    }
-  }
-
-  public Iterable<Trigger> subTriggers() {
-    return subTriggers;
-  }
-
-  /**
-   * Return a trigger to use after a {@code GroupByKey} to preserve the
-   * intention of this trigger. Specifically, triggers that are time based
-   * and intended to provide speculative results should continue providing
-   * speculative results. Triggers that fire once (or multiple times) should
-   * continue firing once (or multiple times).
-   */
-  public Trigger getContinuationTrigger() {
-    if (subTriggers == null) {
-      return getContinuationTrigger(null);
-    }
-
-    List<Trigger> subTriggerContinuations = new ArrayList<>();
-    for (Trigger subTrigger : subTriggers) {
-      subTriggerContinuations.add(subTrigger.getContinuationTrigger());
-    }
-    return getContinuationTrigger(subTriggerContinuations);
-  }
-
-  /**
-   * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this
-   * is provided the continuation trigger of each of the sub-triggers.
-   */
-  protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers);
-
-  /**
-   * Returns a bound in watermark time by which this trigger would have fired at least once
-   * for a given window had there been input data.  This is a static property of a trigger
-   * that does not depend on its state.
-   *
-   * <p>For triggers that do not fire based on the watermark advancing, returns
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
-   *
-   * <p>This estimate is used to determine that there are no elements in a side-input window, which
-   * causes the default value to be used instead.
-   */
-  public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window);
-
-  /**
-   * Returns whether this performs the same triggering as the given {@code Trigger}.
-   */
-  public boolean isCompatible(Trigger other) {
-    if (!getClass().equals(other.getClass())) {
-      return false;
-    }
-
-    if (subTriggers == null) {
-      return other.subTriggers == null;
-    } else if (other.subTriggers == null) {
-      return false;
-    } else if (subTriggers.size() != other.subTriggers.size()) {
-      return false;
-    }
-
-    for (int i = 0; i < subTriggers.size(); i++) {
-      if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    String simpleName = getClass().getSimpleName();
-    if (getClass().getEnclosingClass() != null) {
-      simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName;
-    }
-    if (subTriggers == null || subTriggers.size() == 0) {
-      return simpleName;
-    } else {
-      return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")";
-    }
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (!(obj instanceof Trigger)) {
-      return false;
-    }
-    Trigger that = (Trigger) obj;
-    return Objects.equals(getClass(), that.getClass())
-        && Objects.equals(subTriggers, that.subTriggers);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(getClass(), subTriggers);
-  }
-
-  /**
-   * Specify an ending condition for this trigger. If the {@code until} fires then the combination
-   * fires.
-   *
-   * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes
-   * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
-   * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that
-   * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2}
-   * has seen are necessarily in the current pane.
-   *
-   * <p>For example the final firing of the following trigger may only have 1 element:
-   * <pre> {@code
-   * Repeatedly.forever(AfterPane.elementCountAtLeast(2))
-   *     .orFinally(AfterPane.elementCountAtLeast(5))
-   * } </pre>
-   *
-   * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same
-   * as {@code AfterFirst.of(t1, t2)}.
-   */
-  public Trigger orFinally(OnceTrigger until) {
-    return new OrFinallyTrigger(this, until);
-  }
-
-  /**
-   * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather
-   * than the general {@link Trigger} class to indicate that behavior.
-   */
-  public abstract static class OnceTrigger extends Trigger {
-    protected OnceTrigger(List<Trigger> subTriggers) {
-      super(subTriggers);
-    }
-
-    @Override
-    public final OnceTrigger getContinuationTrigger() {
-      Trigger continuation = super.getContinuationTrigger();
-      if (!(continuation instanceof OnceTrigger)) {
-        throw new IllegalStateException("Continuation of a OnceTrigger must be a OnceTrigger");
-      }
-      return (OnceTrigger) continuation;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public final void onFire(TriggerContext context) throws Exception {
-      onOnlyFiring(context);
-      context.trigger().setFinished(true);
-    }
-
-    /**
-     * Called exactly once by {@link #onFire} when the trigger is fired. By default,
-     * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
-     */
-    protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
deleted file mode 100644
index 088c499..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-
-/**
- * A wrapper around a trigger used during execution. While an actual trigger may appear multiple
- * times (both in the same trigger expression and in other trigger expressions), the
- * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
- */
-public class ExecutableTrigger implements Serializable {
-
-  /** Store the index assigned to this trigger. */
-  private final int triggerIndex;
-  private final int firstIndexAfterSubtree;
-  private final List<ExecutableTrigger> subTriggers = new ArrayList<>();
-  private final Trigger trigger;
-
-  public static <W extends BoundedWindow> ExecutableTrigger create(Trigger trigger) {
-    return create(trigger, 0);
-  }
-
-  private static <W extends BoundedWindow> ExecutableTrigger create(
-      Trigger trigger, int nextUnusedIndex) {
-    if (trigger instanceof OnceTrigger) {
-      return new ExecutableOnceTrigger((OnceTrigger) trigger, nextUnusedIndex);
-    } else {
-      return new ExecutableTrigger(trigger, nextUnusedIndex);
-    }
-  }
-
-  public static <W extends BoundedWindow> ExecutableTrigger createForOnceTrigger(
-      OnceTrigger trigger, int nextUnusedIndex) {
-    return new ExecutableOnceTrigger(trigger, nextUnusedIndex);
-  }
-
-  private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) {
-    this.trigger = checkNotNull(trigger, "trigger must not be null");
-    this.triggerIndex = nextUnusedIndex++;
-
-    if (trigger.subTriggers() != null) {
-      for (Trigger subTrigger : trigger.subTriggers()) {
-        ExecutableTrigger subExecutable = create(subTrigger, nextUnusedIndex);
-        subTriggers.add(subExecutable);
-        nextUnusedIndex = subExecutable.firstIndexAfterSubtree;
-      }
-    }
-    firstIndexAfterSubtree = nextUnusedIndex;
-  }
-
-  public List<ExecutableTrigger> subTriggers() {
-    return subTriggers;
-  }
-
-  @Override
-  public String toString() {
-    return trigger.toString();
-  }
-
-  /**
-   * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}.
-   */
-  public Trigger getSpec() {
-    return trigger;
-  }
-
-  public int getTriggerIndex() {
-    return triggerIndex;
-  }
-
-  public final int getFirstIndexAfterSubtree() {
-    return firstIndexAfterSubtree;
-  }
-
-  public boolean isCompatible(ExecutableTrigger other) {
-    return trigger.isCompatible(other.trigger);
-  }
-
-  public ExecutableTrigger getSubTriggerContaining(int index) {
-    checkNotNull(subTriggers);
-    checkState(index > triggerIndex && index < firstIndexAfterSubtree,
-        "Cannot find sub-trigger containing index not in this tree.");
-    ExecutableTrigger previous = null;
-    for (ExecutableTrigger subTrigger : subTriggers) {
-      if (index < subTrigger.triggerIndex) {
-        return previous;
-      }
-      previous = subTrigger;
-    }
-    return previous;
-  }
-
-  /**
-   * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are
-   * properly updated if the trigger finishes.
-   */
-  public void invokeOnElement(Trigger.OnElementContext c) throws Exception {
-    trigger.onElement(c.forTrigger(this));
-  }
-
-  /**
-   * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly
-   * updated.
-   */
-  public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception {
-    Trigger.OnMergeContext subContext = c.forTrigger(this);
-    trigger.onMerge(subContext);
-  }
-
-  public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception {
-    return trigger.shouldFire(c.forTrigger(this));
-  }
-
-  public void invokeOnFire(Trigger.TriggerContext c) throws Exception {
-    trigger.onFire(c.forTrigger(this));
-  }
-
-  /**
-   * Invoke clear for the current this trigger.
-   */
-  public void invokeClear(Trigger.TriggerContext c) throws Exception {
-    trigger.clear(c.forTrigger(this));
-  }
-
-  /**
-   * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH
-   * and never just FIRE.
-   */
-  private static class ExecutableOnceTrigger extends ExecutableTrigger {
-
-    public ExecutableOnceTrigger(OnceTrigger trigger, int nextUnusedIndex) {
-      super(trigger, nextUnusedIndex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
deleted file mode 100644
index 6666ab9..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-/**
- * A mutable set which tracks whether any particular {@link ExecutableTrigger} is
- * finished.
- */
-public interface FinishedTriggers {
-  /**
-   * Returns {@code true} if the trigger is finished.
-   */
-  public boolean isFinished(ExecutableTrigger trigger);
-
-  /**
-   * Sets the fact that the trigger is finished.
-   */
-  public void setFinished(ExecutableTrigger trigger, boolean value);
-
-  /**
-   * Sets the trigger and all of its subtriggers to unfinished.
-   */
-  public void clearRecursively(ExecutableTrigger trigger);
-
-  /**
-   * Create an independent copy of this mutable {@link FinishedTriggers}.
-   */
-  public FinishedTriggers copy();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
deleted file mode 100644
index 4cd617f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.BitSet;
-
-/**
- * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}.
- */
-public class FinishedTriggersBitSet implements FinishedTriggers {
-
-  private final BitSet bitSet;
-
-  private FinishedTriggersBitSet(BitSet bitSet) {
-    this.bitSet = bitSet;
-  }
-
-  public static FinishedTriggersBitSet emptyWithCapacity(int capacity) {
-    return new FinishedTriggersBitSet(new BitSet(capacity));
-  }
-
-  public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) {
-    return new FinishedTriggersBitSet(bitSet);
-  }
-
-  /**
-   * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}.
-   */
-  public BitSet getBitSet() {
-    return bitSet;
-  }
-
-  @Override
-  public boolean isFinished(ExecutableTrigger trigger) {
-    return bitSet.get(trigger.getTriggerIndex());
-  }
-
-  @Override
-  public void setFinished(ExecutableTrigger trigger, boolean value) {
-    bitSet.set(trigger.getTriggerIndex(), value);
-  }
-
-  @Override
-  public void clearRecursively(ExecutableTrigger trigger) {
-    bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree());
-  }
-
-  @Override
-  public FinishedTriggersBitSet copy() {
-    return new FinishedTriggersBitSet((BitSet) bitSet.clone());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
deleted file mode 100644
index a9feb73..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.collect.Sets;
-import java.util.Set;
-
-/**
- * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}.
- */
-public class FinishedTriggersSet implements FinishedTriggers {
-
-  private final Set<ExecutableTrigger> finishedTriggers;
-
-  private FinishedTriggersSet(Set<ExecutableTrigger> finishedTriggers) {
-    this.finishedTriggers = finishedTriggers;
-  }
-
-  public static FinishedTriggersSet fromSet(Set<ExecutableTrigger> finishedTriggers) {
-    return new FinishedTriggersSet(finishedTriggers);
-  }
-
-  /**
-   * Returns a mutable {@link Set} of the underlying triggers that are finished.
-   */
-  public Set<ExecutableTrigger> getFinishedTriggers() {
-    return finishedTriggers;
-  }
-
-  @Override
-  public boolean isFinished(ExecutableTrigger trigger) {
-    return finishedTriggers.contains(trigger);
-  }
-
-  @Override
-  public void setFinished(ExecutableTrigger trigger, boolean value) {
-    if (value) {
-      finishedTriggers.add(trigger);
-    } else {
-      finishedTriggers.remove(trigger);
-    }
-  }
-
-  @Override
-  public void clearRecursively(ExecutableTrigger trigger) {
-    finishedTriggers.remove(trigger);
-    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
-      clearRecursively(subTrigger);
-    }
-  }
-
-  @Override
-  public FinishedTriggersSet copy() {
-    return fromSet(Sets.newHashSet(finishedTriggers));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
deleted file mode 100644
index 9e2c27d..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Instant;
-
-/**
- * The trigger used with {@link Reshuffle} which triggers on every element
- * and never buffers state.
- *
- * @param <W> The kind of window that is being reshuffled.
- */
-public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
-
-  public ReshuffleTrigger() {
-    super(null);
-  }
-
-  @Override
-  public void onElement(Trigger.OnElementContext c) { }
-
-  @Override
-  public void onMerge(Trigger.OnMergeContext c) { }
-
-  @Override
-  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return this;
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    throw new UnsupportedOperationException(
-        "ReshuffleTrigger should not be used outside of Reshuffle");
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return true;
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception { }
-
-  @Override
-  public String toString() {
-    return "ReshuffleTrigger()";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
deleted file mode 100644
index e09aac2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.joda.time.Instant;
-
-/**
- * Factory for creating instances of the various {@link Trigger} contexts.
- *
- * <p>These contexts are highly interdependent and share many fields; it is inadvisable
- * to create them via any means other than this factory class.
- */
-public class TriggerContextFactory<W extends BoundedWindow> {
-
-  private final WindowFn<?, W> windowFn;
-  private StateInternals<?> stateInternals;
-  private final Coder<W> windowCoder;
-
-  public TriggerContextFactory(WindowFn<?, W> windowFn,
-      StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
-    // Future triggers may be able to exploit the active window to state address window mapping.
-    this.windowFn = windowFn;
-    this.stateInternals = stateInternals;
-    this.windowCoder = windowFn.windowCoder();
-  }
-
-  public Trigger.TriggerContext base(W window, Timers timers,
-      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) {
-    return new TriggerContextImpl(window, timers, rootTrigger, finishedSet);
-  }
-
-  public Trigger.OnElementContext createOnElementContext(
-      W window, Timers timers, Instant elementTimestamp,
-      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) {
-    return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp);
-  }
-
-  public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers,
-      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet,
-      Map<W, FinishedTriggers> finishedSets) {
-    return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets);
-  }
-
-  public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger trigger) {
-    return new StateAccessorImpl(window, trigger);
-  }
-
-  public MergingStateAccessor<?, W> createMergingStateAccessor(
-      W mergeResult, Collection<W> mergingWindows, ExecutableTrigger trigger) {
-    return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult);
-  }
-
-  private class TriggerInfoImpl implements Trigger.TriggerInfo {
-
-    protected final ExecutableTrigger trigger;
-    protected final FinishedTriggers finishedSet;
-    private final Trigger.TriggerContext context;
-
-    public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet,
-        Trigger.TriggerContext context) {
-      this.trigger = trigger;
-      this.finishedSet = finishedSet;
-      this.context = context;
-    }
-
-    @Override
-    public boolean isMerging() {
-      return !windowFn.isNonMerging();
-    }
-
-    @Override
-    public Iterable<ExecutableTrigger> subTriggers() {
-      return trigger.subTriggers();
-    }
-
-    @Override
-    public ExecutableTrigger subTrigger(int subtriggerIndex) {
-      return trigger.subTriggers().get(subtriggerIndex);
-    }
-
-    @Override
-    public boolean isFinished() {
-      return finishedSet.isFinished(trigger);
-    }
-
-    @Override
-    public boolean isFinished(int subtriggerIndex) {
-      return finishedSet.isFinished(subTrigger(subtriggerIndex));
-    }
-
-    @Override
-    public boolean areAllSubtriggersFinished() {
-      return Iterables.isEmpty(unfinishedSubTriggers());
-    }
-
-    @Override
-    public Iterable<ExecutableTrigger> unfinishedSubTriggers() {
-      return FluentIterable
-          .from(trigger.subTriggers())
-          .filter(new Predicate<ExecutableTrigger>() {
-            @Override
-            public boolean apply(ExecutableTrigger trigger) {
-              return !finishedSet.isFinished(trigger);
-            }
-          });
-    }
-
-    @Override
-    public ExecutableTrigger firstUnfinishedSubTrigger() {
-      for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
-        if (!finishedSet.isFinished(subTrigger)) {
-          return subTrigger;
-        }
-      }
-      return null;
-    }
-
-    @Override
-    public void resetTree() throws Exception {
-      finishedSet.clearRecursively(trigger);
-      trigger.invokeClear(context);
-    }
-
-    @Override
-    public void setFinished(boolean finished) {
-      finishedSet.setFinished(trigger, finished);
-    }
-
-    @Override
-    public void setFinished(boolean finished, int subTriggerIndex) {
-      finishedSet.setFinished(subTrigger(subTriggerIndex), finished);
-    }
-  }
-
-  private class TriggerTimers implements Timers {
-
-    private final Timers timers;
-    private final W window;
-
-    public TriggerTimers(W window, Timers timers) {
-      this.timers = timers;
-      this.window = window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timers.setTimer(timestamp, timeDomain);
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      if (timeDomain == TimeDomain.EVENT_TIME
-          && timestamp.equals(window.maxTimestamp())) {
-        // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time
-        // state transitions.
-        return;
-      }
-      timers.deleteTimer(timestamp, timeDomain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class MergingTriggerInfoImpl
-      extends TriggerInfoImpl implements Trigger.MergingTriggerInfo {
-
-    private final Map<W, FinishedTriggers> finishedSets;
-
-    public MergingTriggerInfoImpl(
-        ExecutableTrigger trigger,
-        FinishedTriggers finishedSet,
-        Trigger.TriggerContext context,
-        Map<W, FinishedTriggers> finishedSets) {
-      super(trigger, finishedSet, context);
-      this.finishedSets = finishedSets;
-    }
-
-    @Override
-    public boolean finishedInAnyMergingWindow() {
-      for (FinishedTriggers finishedSet : finishedSets.values()) {
-        if (finishedSet.isFinished(trigger)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public boolean finishedInAllMergingWindows() {
-      for (FinishedTriggers finishedSet : finishedSets.values()) {
-        if (!finishedSet.isFinished(trigger)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  private class StateAccessorImpl implements StateAccessor<Object> {
-    protected final int triggerIndex;
-    protected final StateNamespace windowNamespace;
-
-    public StateAccessorImpl(
-        W window,
-        ExecutableTrigger trigger) {
-      this.triggerIndex = trigger.getTriggerIndex();
-      this.windowNamespace = namespaceFor(window);
-    }
-
-    protected StateNamespace namespaceFor(W window) {
-      return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex);
-    }
-
-    @Override
-    public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
-      return stateInternals.state(windowNamespace, address);
-    }
-  }
-
-  private class MergingStateAccessorImpl extends StateAccessorImpl
-  implements MergingStateAccessor<Object, W> {
-    private final Collection<W> activeToBeMerged;
-
-    public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection<W> activeToBeMerged,
-        W mergeResult) {
-      super(mergeResult, trigger);
-      this.activeToBeMerged = activeToBeMerged;
-    }
-
-    @Override
-    public <StateT extends State> StateT access(
-        StateTag<? super Object, StateT> address) {
-      return stateInternals.state(windowNamespace, address);
-    }
-
-    @Override
-    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super Object, StateT> address) {
-      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
-      for (W mergingWindow : activeToBeMerged) {
-        StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
-        builder.put(mergingWindow, stateForWindow);
-      }
-      return builder.build();
-    }
-  }
-
-  private class TriggerContextImpl extends Trigger.TriggerContext {
-
-    private final W window;
-    private final StateAccessorImpl state;
-    private final Timers timers;
-    private final TriggerInfoImpl triggerInfo;
-
-    private TriggerContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger trigger,
-        FinishedTriggers finishedSet) {
-      trigger.getSpec().super();
-      this.window = window;
-      this.state = new StateAccessorImpl(window, trigger);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
-    }
-
-    @Override
-    public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) {
-      return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet);
-    }
-
-    @Override
-    public TriggerInfo trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public StateAccessor<?> state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.deleteTimer(timestamp, domain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class OnElementContextImpl extends Trigger.OnElementContext {
-
-    private final W window;
-    private final StateAccessorImpl state;
-    private final Timers timers;
-    private final TriggerInfoImpl triggerInfo;
-    private final Instant eventTimestamp;
-
-    private OnElementContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger trigger,
-        FinishedTriggers finishedSet,
-        Instant eventTimestamp) {
-      trigger.getSpec().super();
-      this.window = window;
-      this.state = new StateAccessorImpl(window, trigger);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
-      this.eventTimestamp = eventTimestamp;
-    }
-
-
-    @Override
-    public Instant eventTimestamp() {
-      return eventTimestamp;
-    }
-
-    @Override
-    public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) {
-      return new OnElementContextImpl(
-          window, timers, trigger, triggerInfo.finishedSet, eventTimestamp);
-    }
-
-    @Override
-    public TriggerInfo trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public StateAccessor<?> state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-    }
-
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.deleteTimer(timestamp, domain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class OnMergeContextImpl extends Trigger.OnMergeContext {
-    private final MergingStateAccessor<?, W> state;
-    private final W window;
-    private final Collection<W> mergingWindows;
-    private final Timers timers;
-    private final MergingTriggerInfoImpl triggerInfo;
-
-    private OnMergeContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger trigger,
-        FinishedTriggers finishedSet,
-        Map<W, FinishedTriggers> finishedSets) {
-      trigger.getSpec().super();
-      this.mergingWindows = finishedSets.keySet();
-      this.window = window;
-      this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets);
-    }
-
-    @Override
-    public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) {
-      return new OnMergeContextImpl(
-          window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets);
-    }
-
-    @Override
-    public MergingStateAccessor<?, W> state() {
-      return state;
-    }
-
-    @Override
-    public MergingTriggerInfo trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java
deleted file mode 100644
index b591229..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link AfterAll}.
- */
-@RunWith(JUnit4.class)
-public class AfterAllTest {
-
-  private SimpleTriggerTester<IntervalWindow> tester;
-
-  @Test
-  public void testT1FiresFirst() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterAll.of(
-            AfterPane.elementCountAtLeast(1),
-            AfterPane.elementCountAtLeast(2)),
-        FixedWindows.of(Duration.millis(100)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testT2FiresFirst() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterAll.of(
-            AfterPane.elementCountAtLeast(2),
-            AfterPane.elementCountAtLeast(1)),
-        FixedWindows.of(Duration.millis(100)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  /**
-   * Tests that the AfterAll properly unsets finished bits when a merge causing it to become
-   * unfinished.
-   */
-  @Test
-  public void testOnMergeRewinds() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            AfterAll.of(
-                AfterWatermark.pastEndOfWindow(),
-                AfterPane.elementCountAtLeast(1)),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
-
-    tester.injectElements(5);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
-
-    // Finish the AfterAll in the first window
-    tester.advanceInputWatermark(new Instant(11));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Merge them; the AfterAll should not be finished
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-    assertFalse(tester.isMarkedFinished(mergedWindow));
-
-    // Confirm that we are back on the first trigger by probing that it is not ready to fire
-    // after an element (with merging)
-    tester.injectElements(3);
-    tester.mergeWindows();
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // Fire the AfterAll in the merged window
-    tester.advanceInputWatermark(new Instant(15));
-    assertTrue(tester.shouldFire(mergedWindow));
-    tester.fireIfShouldFire(mergedWindow);
-
-    // Confirm that we are on the second trigger by probing
-    tester.injectElements(2);
-    tester.mergeWindows();
-    assertTrue(tester.shouldFire(mergedWindow));
-    tester.fireIfShouldFire(mergedWindow);
-    tester.injectElements(2);
-    tester.mergeWindows();
-    assertTrue(tester.shouldFire(mergedWindow));
-    tester.fireIfShouldFire(mergedWindow);
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterAll.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(1))
-            .getWatermarkThatGuaranteesFiring(window));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
-    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
-    Trigger afterAll = AfterAll.of(trigger1, trigger2);
-    assertEquals(
-        AfterAll.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()),
-        afterAll.getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterAll.of(StubTrigger.named("t1"), StubTrigger.named("t2"));
-    assertEquals("AfterAll.of(t1, t2)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java
deleted file mode 100644
index c413c6e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AfterEach}.
- */
-@RunWith(JUnit4.class)
-public class AfterEachTest {
-
-  private SimpleTriggerTester<IntervalWindow> tester;
-
-  @Before
-  public void initMocks() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  /**
-   * Tests that the {@link AfterEach} trigger fires and finishes the first trigger then the second.
-   */
-  @Test
-  public void testAfterEachInSequence() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            Repeatedly.forever(AfterPane.elementCountAtLeast(2))
-                .orFinally(AfterPane.elementCountAtLeast(3)),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(5))
-                .orFinally(AfterWatermark.pastEndOfWindow())),
-            FixedWindows.of(Duration.millis(10)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    // AfterCount(2) not ready
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    // AfterCount(2) ready, not finished
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // orFinally(AfterCount(3)) ready and will finish the first
-    tester.injectElements(1, 2, 3);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // Now running as the second trigger
-    assertFalse(tester.shouldFire(window));
-    // This quantity of elements would fire and finish if it were erroneously still the first
-    tester.injectElements(1, 2, 3, 4);
-    assertFalse(tester.shouldFire(window));
-
-    // Now fire once
-    tester.injectElements(5);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // This time advance the watermark to finish the whole mess.
-    tester.advanceInputWatermark(new Instant(10));
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    assertEquals(new Instant(9),
-        AfterEach.inOrder(AfterWatermark.pastEndOfWindow(),
-                          AfterPane.elementCountAtLeast(4))
-            .getWatermarkThatGuaranteesFiring(window));
-
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterEach.inOrder(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow())
-            .getWatermarkThatGuaranteesFiring(window));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
-    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
-    Trigger afterEach = AfterEach.inOrder(trigger1, trigger2);
-    assertEquals(
-        Repeatedly.forever(AfterFirst.of(
-            trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())),
-        afterEach.getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterEach.inOrder(
-        StubTrigger.named("t1"),
-        StubTrigger.named("t2"),
-        StubTrigger.named("t3"));
-
-    assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java
deleted file mode 100644
index 415060b..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AfterFirst}.
- */
-@RunWith(JUnit4.class)
-public class AfterFirstTest {
-
-  @Mock private OnceTrigger mockTrigger1;
-  @Mock private OnceTrigger mockTrigger2;
-  private SimpleTriggerTester<IntervalWindow> tester;
-  private static Trigger.TriggerContext anyTriggerContext() {
-    return Mockito.<Trigger.TriggerContext>any();
-  }
-
-  @Before
-  public void initMocks() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testNeitherShouldFireFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
-
-    tester.injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
-    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
-
-    assertFalse(tester.shouldFire(window)); // should not fire
-    assertFalse(tester.isMarkedFinished(window)); // not finished
-  }
-
-  @Test
-  public void testOnlyT1ShouldFireFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
-    tester.injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
-    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
-    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
-
-    assertTrue(tester.shouldFire(window)); // should fire
-
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testOnlyT2ShouldFireFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-    AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
-    tester.injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
-    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
-    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
-    assertTrue(tester.shouldFire(window)); // should fire
-
-    tester.fireIfShouldFire(window); // now finished
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testBothShouldFireFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-    AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
-    tester.injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
-    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
-    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
-    assertTrue(tester.shouldFire(window)); // should fire
-
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  /**
-   * Tests that if the first trigger rewinds to be non-finished in the merged window,
-   * then it becomes the currently active trigger again, with real triggers.
-   */
-  @Test
-  public void testShouldFireAfterMerge() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            AfterFirst.of(AfterPane.elementCountAtLeast(5),
-                AfterWatermark.pastEndOfWindow()),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    // Finished the AfterFirst in the first window
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
-    assertFalse(tester.shouldFire(firstWindow));
-    tester.advanceInputWatermark(new Instant(11));
-    assertTrue(tester.shouldFire(firstWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Set up second window where it is not done
-    tester.injectElements(5);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Merge them, if the merged window were on the second trigger, it would be ready
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // Now adding 3 more makes the AfterFirst ready to fire
-    tester.injectElements(1, 2, 3, 4, 5);
-    tester.mergeWindows();
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    assertEquals(new Instant(9),
-        AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(4))
-            .getWatermarkThatGuaranteesFiring(window));
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterPane.elementCountAtLeast(1))
-            .getWatermarkThatGuaranteesFiring(window));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
-    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
-    Trigger afterFirst = AfterFirst.of(trigger1, trigger2);
-    assertEquals(
-        AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()),
-        afterFirst.getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), StubTrigger.named("t2"));
-    assertEquals("AfterFirst.of(t1, t2)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java
deleted file mode 100644
index 38d030e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link AfterPane}.
- */
-@RunWith(JUnit4.class)
-public class AfterPaneTest {
-
-  SimpleTriggerTester<IntervalWindow> tester;
-  /**
-   * Tests that the trigger does fire when enough elements are in a window, and that it only
-   * fires that window (no leakage).
-   */
-  @Test
-  public void testAfterPaneElementCountFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterPane.elementCountAtLeast(2),
-        FixedWindows.of(Duration.millis(10)));
-
-    tester.injectElements(1); // [0, 10)
-    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-    assertFalse(tester.shouldFire(window));
-
-    tester.injectElements(2); // [0, 10)
-    tester.injectElements(11); // [10, 20)
-
-    assertTrue(tester.shouldFire(window)); // ready to fire
-    tester.fireIfShouldFire(window); // and finished
-    assertTrue(tester.isMarkedFinished(window));
-
-    // But don't finish the other window
-    assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20))));
-  }
-
-  @Test
-  public void testClear() throws Exception {
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterPane.elementCountAtLeast(2),
-        FixedWindows.of(Duration.millis(10)));
-
-    tester.injectElements(1, 2, 3);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-    tester.clearState(window);
-    tester.assertCleared(window);
-  }
-
-  @Test
-  public void testAfterPaneElementCountSessions() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterPane.elementCountAtLeast(2),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(
-        1, // in [1, 11)
-        2); // in [2, 12)
-
-    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11))));
-    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12))));
-
-    tester.mergeWindows();
-
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12));
-    assertTrue(tester.shouldFire(mergedWindow));
-    tester.fireIfShouldFire(mergedWindow);
-    assertTrue(tester.isMarkedFinished(mergedWindow));
-
-    // Because we closed the previous window, we don't have it around to merge with. So there
-    // will be a new FIRE_AND_FINISH result.
-    tester.injectElements(
-        7,  // in [7, 17)
-        9); // in [9, 19)
-
-    tester.mergeWindows();
-
-    IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19));
-    assertTrue(tester.shouldFire(newMergedWindow));
-    tester.fireIfShouldFire(newMergedWindow);
-    assertTrue(tester.isMarkedFinished(newMergedWindow));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterPane.elementCountAtLeast(1).getWatermarkThatGuaranteesFiring(
-            new IntervalWindow(new Instant(0), new Instant(10))));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    assertEquals(
-        AfterPane.elementCountAtLeast(1),
-        AfterPane.elementCountAtLeast(100).getContinuationTrigger());
-    assertEquals(
-        AfterPane.elementCountAtLeast(1),
-        AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterPane.elementCountAtLeast(5);
-    assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
deleted file mode 100644
index 13a7acf..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link AfterProcessingTime}.
- */
-@RunWith(JUnit4.class)
-public class AfterProcessingTimeTest {
-
-  /**
-   * Tests the basic property that the trigger does wait for processing time to be
-   * far enough advanced.
-   */
-  @Test
-  public void testAfterProcessingTimeFixedWindows() throws Exception {
-    Duration windowDuration = Duration.millis(10);
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.millis(5)),
-        FixedWindows.of(windowDuration));
-
-    tester.advanceProcessingTime(new Instant(10));
-
-    // Timer at 15
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
-    tester.advanceProcessingTime(new Instant(12));
-    assertFalse(tester.shouldFire(firstWindow));
-
-    // Load up elements in the next window, timer at 17 for them
-    tester.injectElements(11, 12, 13);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Not quite time to fire
-    tester.advanceProcessingTime(new Instant(14));
-    assertFalse(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
-    tester.injectElements(2, 3);
-
-    // Advance past the first timer and fire, finishing the first window
-    tester.advanceProcessingTime(new Instant(16));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-    assertTrue(tester.isMarkedFinished(firstWindow));
-
-    // The next window fires and finishes now
-    tester.advanceProcessingTime(new Instant(18));
-    assertTrue(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(secondWindow);
-    assertTrue(tester.isMarkedFinished(secondWindow));
-  }
-
-  /**
-   * Tests that when windows merge, if the trigger is waiting for "N millis after the first
-   * element" that it is relative to the earlier of the two merged windows.
-   */
-  @Test
-  public void testClear() throws Exception {
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.millis(5)),
-        FixedWindows.of(Duration.millis(10)));
-
-    tester.injectElements(1, 2, 3);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-    tester.clearState(window);
-    tester.assertCleared(window);
-  }
-
-  @Test
-  public void testAfterProcessingTimeWithMergingWindow() throws Exception {
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.millis(5)),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.advanceProcessingTime(new Instant(10));
-    tester.injectElements(1); // in [1, 11), timer for 15
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
-    assertFalse(tester.shouldFire(firstWindow));
-
-    tester.advanceProcessingTime(new Instant(12));
-    tester.injectElements(3); // in [3, 13), timer for 17
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
-
-    tester.advanceProcessingTime(new Instant(16));
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterProcessingTime.pastFirstElementInPane().getWatermarkThatGuaranteesFiring(
-            new IntervalWindow(new Instant(0), new Instant(10))));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger firstElementPlus1 =
-        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1));
-    assertEquals(
-        new AfterSynchronizedProcessingTime(),
-        firstElementPlus1.getContinuationTrigger());
-  }
-
-  /**
-   * Basic test of compatibility check between identical triggers.
-   */
-  @Test
-  public void testCompatibilityIdentical() throws Exception {
-    Trigger t1 = AfterProcessingTime.pastFirstElementInPane()
-            .plusDelayOf(Duration.standardMinutes(1L));
-    Trigger t2 = AfterProcessingTime.pastFirstElementInPane()
-            .plusDelayOf(Duration.standardMinutes(1L));
-    assertTrue(t1.isCompatible(t2));
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
-    assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString());
-  }
-
-  @Test
-  public void testWithDelayToString() {
-    Trigger trigger = AfterProcessingTime.pastFirstElementInPane()
-        .plusDelayOf(Duration.standardMinutes(5));
-
-    assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)",
-        trigger.toString());
-  }
-
-  @Test
-  public void testBuiltUpToString() {
-    Trigger trigger = AfterWatermark.pastEndOfWindow()
-        .withLateFirings(AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.standardMinutes(10)));
-
-    String expected = "AfterWatermark.pastEndOfWindow()"
-        + ".withLateFirings(AfterProcessingTime"
-        + ".pastFirstElementInPane()"
-        + ".plusDelayOf(10 minutes))";
-
-    assertEquals(expected, trigger.toString());
-  }
-}


Mime
View raw message