beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [12/17] incubator-beam git commit: Move triggers to runners-core
Date Thu, 13 Oct 2016 22:22:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java
new file mode 100644
index 0000000..a9feb73
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.common.collect.Sets;
+import java.util.Set;
+
+/**
+ * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}.
+ */
+public class FinishedTriggersSet implements FinishedTriggers {
+
+  private final Set<ExecutableTrigger> finishedTriggers;
+
+  private FinishedTriggersSet(Set<ExecutableTrigger> finishedTriggers) {
+    this.finishedTriggers = finishedTriggers;
+  }
+
+  public static FinishedTriggersSet fromSet(Set<ExecutableTrigger> finishedTriggers) {
+    return new FinishedTriggersSet(finishedTriggers);
+  }
+
+  /**
+   * Returns a mutable {@link Set} of the underlying triggers that are finished.
+   */
+  public Set<ExecutableTrigger> getFinishedTriggers() {
+    return finishedTriggers;
+  }
+
+  @Override
+  public boolean isFinished(ExecutableTrigger trigger) {
+    return finishedTriggers.contains(trigger);
+  }
+
+  @Override
+  public void setFinished(ExecutableTrigger trigger, boolean value) {
+    if (value) {
+      finishedTriggers.add(trigger);
+    } else {
+      finishedTriggers.remove(trigger);
+    }
+  }
+
+  @Override
+  public void clearRecursively(ExecutableTrigger trigger) {
+    finishedTriggers.remove(trigger);
+    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+      clearRecursively(subTrigger);
+    }
+  }
+
+  @Override
+  public FinishedTriggersSet copy() {
+    return fromSet(Sets.newHashSet(finishedTriggers));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java
new file mode 100644
index 0000000..5f20465
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.joda.time.Instant;
+
+/**
+ * A trigger which never fires.
+ *
+ * <p>Using this trigger will only produce output when the watermark passes the end of the
+ * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed
+ * lateness}.
+ */
+public final class Never {
+  /**
+   * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
+   * when the {@link BoundedWindow} closes.
+   */
+  public static OnceTrigger ever() {
+    // NeverTrigger ignores all inputs and is Window-type independent.
+    return new NeverTrigger();
+  }
+
+  // package-private in order to check identity for string formatting.
+  static class NeverTrigger extends OnceTrigger {
+    protected NeverTrigger() {
+      super(null);
+    }
+
+    @Override
+    public void onElement(OnElementContext c) {}
+
+    @Override
+    public void onMerge(OnMergeContext c) {}
+
+    @Override
+    protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+      return this;
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return BoundedWindow.TIMESTAMP_MAX_VALUE;
+    }
+
+    @Override
+    public boolean shouldFire(Trigger.TriggerContext context) {
+      return false;
+    }
+
+    @Override
+    protected void onOnlyFiring(Trigger.TriggerContext context) {
+      throw new UnsupportedOperationException(
+          String.format("%s should never fire", getClass().getSimpleName()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java
new file mode 100644
index 0000000..25b7b34
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires.
+ */
+class OrFinallyTrigger extends Trigger {
+
+  private static final int ACTUAL = 0;
+  private static final int UNTIL = 1;
+
+  @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) {
+    super(Arrays.asList(actual, until));
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
+    c.trigger().subTrigger(UNTIL).invokeOnElement(c);
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+      subTrigger.invokeOnMerge(c);
+    }
+    updateFinishedState(c);
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    // This trigger fires once either the trigger or the until trigger fires.
+    Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window);
+    Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window);
+    return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline;
+  }
+
+  @Override
+  public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+    // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL
+    // may not be a OnceTrigger.
+    return Repeatedly.forever(
+        new OrFinallyTrigger(
+            continuationTriggers.get(ACTUAL),
+            (Trigger.OnceTrigger) continuationTriggers.get(UNTIL)));
+  }
+
+  @Override
+  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+    return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context)
+        || context.trigger().subTrigger(UNTIL).invokeShouldFire(context);
+  }
+
+  @Override
+  public void onFire(Trigger.TriggerContext context) throws Exception {
+    ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL);
+    ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL);
+
+    if (untilSubtrigger.invokeShouldFire(context)) {
+      untilSubtrigger.invokeOnFire(context);
+      actualSubtrigger.invokeClear(context);
+    } else {
+      // If until didn't fire, then the actual must have (or it is forbidden to call
+      // onFire) so we are done only if actual is done.
+      actualSubtrigger.invokeOnFire(context);
+      // Do not clear the until trigger, because it tracks data cross firings.
+    }
+    updateFinishedState(context);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
+  }
+
+  private void updateFinishedState(TriggerContext c) throws Exception {
+    boolean anyStillFinished = false;
+    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+      anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished();
+    }
+    c.trigger().setFinished(anyStillFinished);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java
new file mode 100644
index 0000000..8858798
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * Repeat a trigger, either until some condition is met or forever.
+ *
+ * <p>For example, to fire after the end of the window, and every time late data arrives:
+ * <pre> {@code
+ *     Repeatedly.forever(AfterWatermark.isPastEndOfWindow());
+ * } </pre>
+ *
+ * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite
+ * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}.
+ */
+public class Repeatedly extends Trigger {
+
+  private static final int REPEATED = 0;
+
+  /**
+   * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each
+   * time it fires and ignoring any indications to finish.
+   *
+   * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish.
+   *
+   * @param repeated the trigger to execute repeatedly.
+   */
+  public static Repeatedly forever(Trigger repeated) {
+    return new Repeatedly(repeated);
+  }
+
+  private Repeatedly(Trigger repeated) {
+    super(Arrays.asList(repeated));
+  }
+
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    getRepeated(c).invokeOnElement(c);
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    getRepeated(c).invokeOnMerge(c);
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    // This trigger fires once the repeated trigger fires.
+    return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
+  }
+
+  @Override
+  public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+    return new Repeatedly(continuationTriggers.get(REPEATED));
+  }
+
+  @Override
+  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+    return getRepeated(context).invokeShouldFire(context);
+  }
+
+  @Override
+  public void onFire(TriggerContext context) throws Exception {
+    getRepeated(context).invokeOnFire(context);
+
+    if (context.trigger().isFinished(REPEATED)) {
+      // Reset tree will recursively clear the finished bits, and invoke clear.
+      context.forTrigger(getRepeated(context)).trigger().resetTree();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED));
+  }
+
+  private ExecutableTrigger getRepeated(TriggerContext context) {
+    return context.trigger().subTrigger(REPEATED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java
new file mode 100644
index 0000000..9e2c27d
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Instant;
+
+/**
+ * The trigger used with {@link Reshuffle} which triggers on every element
+ * and never buffers state.
+ *
+ * @param <W> The kind of window that is being reshuffled.
+ */
+public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
+
+  public ReshuffleTrigger() {
+    super(null);
+  }
+
+  @Override
+  public void onElement(Trigger.OnElementContext c) { }
+
+  @Override
+  public void onMerge(Trigger.OnMergeContext c) { }
+
+  @Override
+  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+    return this;
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    throw new UnsupportedOperationException(
+        "ReshuffleTrigger should not be used outside of Reshuffle");
+  }
+
+  @Override
+  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+    return true;
+  }
+
+  @Override
+  public void onFire(Trigger.TriggerContext context) throws Exception { }
+
+  @Override
+  public String toString() {
+    return "ReshuffleTrigger()";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java
new file mode 100644
index 0000000..a960aa4
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.base.Joiner;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.joda.time.Instant;
+
+/**
+ * {@code Trigger}s control when the elements for a specific key and window are output. As elements
+ * arrive, they are put into one or more windows by a {@link Window} transform and its associated
+ * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the
+ * {@code Window}s contents should be output.
+ *
+ * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window}
+ * for more information about how grouping with windows works.
+ *
+ * <p>The elements that are assigned to a window since the last time it was fired (or since the
+ * window was created) are placed into the current window pane. Triggers are evaluated against the
+ * elements as they are added. When the root trigger fires, the elements in the current pane will be
+ * output. When the root trigger finishes (indicating it will never fire again), the window is
+ * closed and any new elements assigned to that window are discarded.
+ *
+ * <p>Several predefined {@code Trigger}s are provided:
+ * <ul>
+ *   <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from
+ *   either the end of the window or the arrival of the first element in a pane.
+ *   <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed
+ *   (typically since the first element in a pane).
+ *   <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as
+ *   the number of elements that have been assigned to the current pane.
+ * </ul>
+ *
+ * <p>In addition, {@code Trigger}s can be combined in a variety of ways:
+ * <ul>
+ *   <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its
+ *   argument finishes it gets reset and starts over. Can be combined with
+ *   {@link Trigger#orFinally} to specify a condition that causes the repetition to stop.
+ *   <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every)
+ *   time that a trigger fires, and advancing to the next trigger in the sequence when it finishes.
+ *   <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments
+ *   fires. An {@link AfterFirst} trigger finishes after it fires once.
+ *   <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments
+ *   have fired at least once. An {@link AfterAll} trigger finishes after it fires once.
+ * </ul>
+ *
+ * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one
+ * of the following states:
+ * <ul>
+ *   <li> Never Existed - before the trigger has started executing, there is no state associated
+ *   with it anywhere in the system. A trigger moves to the executing state as soon as it
+ *   processes in the current pane.
+ *   <li> Executing - while the trigger is receiving items and may fire. While it is in this state,
+ *   it may persist book-keeping information to persisted state, set timers, etc.
+ *   <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the
+ *   system remembers only that it is finished. Entering this state causes us to discard any
+ *   elements in the buffer for that window, as well.
+ * </ul>
+ *
+ * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite
+ * trigger could reset its sub-triggers.
+ *
+ * <p>Triggers should not build up any state internally since they may be recreated
+ * between invocations of the callbacks. All important values should be persisted using
+ * state before the callback returns.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public abstract class Trigger implements Serializable {
+
+  /**
+   * Interface for accessing information about the trigger being executed and other triggers in the
+   * same tree.
+   */
+  public interface TriggerInfo {
+
+    /**
+     * Returns true if the windowing strategy of the current {@code PCollection} is a merging
+     * WindowFn. If true, the trigger execution needs to keep enough information to support the
+     * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will
+     * never be called.
+     */
+    boolean isMerging();
+
+    /**
+     * Access the executable versions of the sub-triggers of the current trigger.
+     */
+    Iterable<ExecutableTrigger> subTriggers();
+
+    /**
+     * Access the executable version of the specified sub-trigger.
+     */
+    ExecutableTrigger subTrigger(int subtriggerIndex);
+
+    /**
+     * Returns true if the current trigger is marked finished.
+     */
+    boolean isFinished();
+
+    /**
+     * Return true if the given subtrigger is marked finished.
+     */
+    boolean isFinished(int subtriggerIndex);
+
+    /**
+     * Returns true if all the sub-triggers of the current trigger are marked finished.
+     */
+    boolean areAllSubtriggersFinished();
+
+    /**
+     * Returns an iterable over the unfinished sub-triggers of the current trigger.
+     */
+    Iterable<ExecutableTrigger> unfinishedSubTriggers();
+
+    /**
+     * Returns the first unfinished sub-trigger.
+     */
+    ExecutableTrigger firstUnfinishedSubTrigger();
+
+    /**
+     * Clears all keyed state for triggers in the current sub-tree and unsets all the associated
+     * finished bits.
+     */
+    void resetTree() throws Exception;
+
+    /**
+     * Sets the finished bit for the current trigger.
+     */
+    void setFinished(boolean finished);
+
+    /**
+     * Sets the finished bit for the given sub-trigger.
+     */
+    void setFinished(boolean finished, int subTriggerIndex);
+  }
+
+  /**
+   * Interact with properties of the trigger being executed, with extensions to deal with the
+   * merging windows.
+   */
+  public interface MergingTriggerInfo extends TriggerInfo {
+
+    /** Return true if the trigger is finished in any window being merged. */
+    public abstract boolean finishedInAnyMergingWindow();
+
+    /** Return true if the trigger is finished in all windows being merged. */
+    public abstract boolean finishedInAllMergingWindows();
+  }
+
+  /**
+   * Information accessible to all operational hooks in this {@code Trigger}.
+   *
+   * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and
+   * extended with additional information in other methods.
+   */
+  public abstract class TriggerContext {
+
+    /** Returns the interface for accessing trigger info. */
+    public abstract TriggerInfo trigger();
+
+    /** Returns the interface for accessing persistent state. */
+    public abstract StateAccessor<?> state();
+
+    /** The window that the current context is executing in. */
+    public abstract BoundedWindow window();
+
+    /** Create a sub-context for the given sub-trigger. */
+    public abstract TriggerContext forTrigger(ExecutableTrigger trigger);
+
+    /**
+     * Removes the timer set in this trigger context for the given {@link Instant}
+     * and {@link TimeDomain}.
+     */
+    public abstract void deleteTimer(Instant timestamp, TimeDomain domain);
+
+    /** The current processing time. */
+    public abstract Instant currentProcessingTime();
+
+    /** The current synchronized upstream processing time or {@code null} if unknown. */
+    @Nullable
+    public abstract Instant currentSynchronizedProcessingTime();
+
+    /** The current event time for the input or {@code null} if unknown. */
+    @Nullable
+    public abstract Instant currentEventTime();
+  }
+
+  /**
+   * Extended {@link TriggerContext} containing information accessible to the {@link #onElement}
+   * operational hook.
+   */
+  public abstract class OnElementContext extends TriggerContext {
+    /** The event timestamp of the element currently being processed. */
+    public abstract Instant eventTimestamp();
+
+    /**
+     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
+     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
+     *
+     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
+     * timer firings for a window will be received, but the implementation should choose to ignore
+     * those that are not applicable.
+     *
+     * @param timestamp the time at which the trigger should be re-evaluated
+     * @param domain the domain that the {@code timestamp} applies to
+     */
+    public abstract void setTimer(Instant timestamp, TimeDomain domain);
+
+    /** Create an {@code OnElementContext} for executing the given trigger. */
+    @Override
+    public abstract OnElementContext forTrigger(ExecutableTrigger trigger);
+  }
+
+  /**
+   * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge}
+   * operational hook.
+   */
+  public abstract class OnMergeContext extends TriggerContext {
+    /**
+     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
+     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
+     *
+     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
+     * timer firings for a window will be received, but the implementation should choose to ignore
+     * those that are not applicable.
+     *
+     * @param timestamp the time at which the trigger should be re-evaluated
+     * @param domain the domain that the {@code timestamp} applies to
+     */
+    public abstract void setTimer(Instant timestamp, TimeDomain domain);
+
+    /** Create an {@code OnMergeContext} for executing the given trigger. */
+    @Override
+    public abstract OnMergeContext forTrigger(ExecutableTrigger trigger);
+
+    @Override
+    public abstract MergingStateAccessor<?, ?> state();
+
+    @Override
+    public abstract MergingTriggerInfo trigger();
+  }
+
+  @Nullable
+  protected final List<Trigger> subTriggers;
+
+  protected Trigger(@Nullable List<Trigger> subTriggers) {
+    this.subTriggers = subTriggers;
+  }
+
+
+  /**
+   * Called every time an element is incorporated into a window.
+   */
+  public abstract void onElement(OnElementContext c) throws Exception;
+
+  /**
+   * Called immediately after windows have been merged.
+   *
+   * <p>Leaf triggers should update their state by inspecting their status and any state
+   * in the merging windows. Composite triggers should update their state by calling
+   * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic.
+   *
+   * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished;
+   * it is the responsibility of the trigger itself to record this fact. It is forbidden for
+   * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending
+   * elements that led to it being ready to fire.
+   *
+   * <p>The implementation does not need to clear out any state associated with the old windows.
+   */
+  public abstract void onMerge(OnMergeContext c) throws Exception;
+
+  /**
+   * Returns {@code true} if the current state of the trigger indicates that its condition
+   * is satisfied and it is ready to fire.
+   */
+  public abstract boolean shouldFire(TriggerContext context) throws Exception;
+
+  /**
+   * Adjusts the state of the trigger to be ready for the next pane. For example, a
+   * {@link Repeatedly} trigger will reset its inner trigger, since it has fired.
+   *
+   * <p>If the trigger is finished, it is the responsibility of the trigger itself to
+   * record that fact via the {@code context}.
+   */
+  public abstract void onFire(TriggerContext context) throws Exception;
+
+  /**
+   * Called to allow the trigger to prefetch any state it will likely need to read from during
+   * an {@link #onElement} call.
+   */
+  public void prefetchOnElement(StateAccessor<?> state) {
+    if (subTriggers != null) {
+      for (Trigger trigger : subTriggers) {
+        trigger.prefetchOnElement(state);
+      }
+    }
+  }
+
+  /**
+   * Called to allow the trigger to prefetch any state it will likely need to read from during
+   * an {@link #onMerge} call.
+   */
+  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
+    if (subTriggers != null) {
+      for (Trigger trigger : subTriggers) {
+        trigger.prefetchOnMerge(state);
+      }
+    }
+  }
+
+  /**
+   * Called to allow the trigger to prefetch any state it will likely need to read from during
+   * an {@link #shouldFire} call.
+   */
+  public void prefetchShouldFire(StateAccessor<?> state) {
+    if (subTriggers != null) {
+      for (Trigger trigger : subTriggers) {
+        trigger.prefetchShouldFire(state);
+      }
+    }
+  }
+
+  /**
+   * Called to allow the trigger to prefetch any state it will likely need to read from during
+   * an {@link #onFire} call.
+   */
+  public void prefetchOnFire(StateAccessor<?> state) {
+    if (subTriggers != null) {
+      for (Trigger trigger : subTriggers) {
+        trigger.prefetchOnFire(state);
+      }
+    }
+  }
+
+  /**
+   * Clear any state associated with this trigger in the given window.
+   *
+   * <p>This is called after a trigger has indicated it will never fire again. The trigger system
+   * keeps enough information to know that the trigger is finished, so this trigger should clear all
+   * of its state.
+   */
+  public void clear(TriggerContext c) throws Exception {
+    if (subTriggers != null) {
+      for (ExecutableTrigger trigger : c.trigger().subTriggers()) {
+        trigger.invokeClear(c);
+      }
+    }
+  }
+
+  public Iterable<Trigger> subTriggers() {
+    return subTriggers;
+  }
+
+  /**
+   * Return a trigger to use after a {@code GroupByKey} to preserve the
+   * intention of this trigger. Specifically, triggers that are time based
+   * and intended to provide speculative results should continue providing
+   * speculative results. Triggers that fire once (or multiple times) should
+   * continue firing once (or multiple times).
+   */
+  public Trigger getContinuationTrigger() {
+    if (subTriggers == null) {
+      return getContinuationTrigger(null);
+    }
+
+    List<Trigger> subTriggerContinuations = new ArrayList<>();
+    for (Trigger subTrigger : subTriggers) {
+      subTriggerContinuations.add(subTrigger.getContinuationTrigger());
+    }
+    return getContinuationTrigger(subTriggerContinuations);
+  }
+
+  /**
+   * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this
+   * is provided the continuation trigger of each of the sub-triggers.
+   */
+  protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers);
+
+  /**
+   * Returns a bound in watermark time by which this trigger would have fired at least once
+   * for a given window had there been input data.  This is a static property of a trigger
+   * that does not depend on its state.
+   *
+   * <p>For triggers that do not fire based on the watermark advancing, returns
+   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+   *
+   * <p>This estimate is used to determine that there are no elements in a side-input window, which
+   * causes the default value to be used instead.
+   */
+  public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window);
+
+  /**
+   * Returns whether this performs the same triggering as the given {@code Trigger}.
+   */
+  public boolean isCompatible(Trigger other) {
+    if (!getClass().equals(other.getClass())) {
+      return false;
+    }
+
+    if (subTriggers == null) {
+      return other.subTriggers == null;
+    } else if (other.subTriggers == null) {
+      return false;
+    } else if (subTriggers.size() != other.subTriggers.size()) {
+      return false;
+    }
+
+    for (int i = 0; i < subTriggers.size(); i++) {
+      if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    String simpleName = getClass().getSimpleName();
+    if (getClass().getEnclosingClass() != null) {
+      simpleName = getClass().getEnclosingClass().getSimpleName() + "." + simpleName;
+    }
+    if (subTriggers == null || subTriggers.size() == 0) {
+      return simpleName;
+    } else {
+      return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")";
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof Trigger)) {
+      return false;
+    }
+    Trigger that = (Trigger) obj;
+    return Objects.equals(getClass(), that.getClass())
+        && Objects.equals(subTriggers, that.subTriggers);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getClass(), subTriggers);
+  }
+
+  /**
+   * Specify an ending condition for this trigger. If the {@code until} fires then the combination
+   * fires.
+   *
+   * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes
+   * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
+   * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that
+   * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2}
+   * has seen are necessarily in the current pane.
+   *
+   * <p>For example the final firing of the following trigger may only have 1 element:
+   * <pre> {@code
+   * Repeatedly.forever(AfterPane.elementCountAtLeast(2))
+   *     .orFinally(AfterPane.elementCountAtLeast(5))
+   * } </pre>
+   *
+   * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same
+   * as {@code AfterFirst.of(t1, t2)}.
+   */
+  public Trigger orFinally(OnceTrigger until) {
+    return new OrFinallyTrigger(this, until);
+  }
+
+  /**
+   * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather
+   * than the general {@link Trigger} class to indicate that behavior.
+   */
+  public abstract static class OnceTrigger extends Trigger {
+    protected OnceTrigger(List<Trigger> subTriggers) {
+      super(subTriggers);
+    }
+
+    @Override
+    public final OnceTrigger getContinuationTrigger() {
+      Trigger continuation = super.getContinuationTrigger();
+      if (!(continuation instanceof OnceTrigger)) {
+        throw new IllegalStateException("Continuation of a OnceTrigger must be a OnceTrigger");
+      }
+      return (OnceTrigger) continuation;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public final void onFire(TriggerContext context) throws Exception {
+      onOnlyFiring(context);
+      context.trigger().setFinished(true);
+    }
+
+    /**
+     * Called exactly once by {@link #onFire} when the trigger is fired. By default,
+     * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
+     */
+    protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java
new file mode 100644
index 0000000..e09aac2
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo;
+import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.joda.time.Instant;
+
+/**
+ * Factory for creating instances of the various {@link Trigger} contexts.
+ *
+ * <p>These contexts are highly interdependent and share many fields; it is inadvisable
+ * to create them via any means other than this factory class.
+ */
+public class TriggerContextFactory<W extends BoundedWindow> {
+
+  private final WindowFn<?, W> windowFn;
+  private StateInternals<?> stateInternals;
+  private final Coder<W> windowCoder;
+
+  public TriggerContextFactory(WindowFn<?, W> windowFn,
+      StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
+    // Future triggers may be able to exploit the active window to state address window mapping.
+    this.windowFn = windowFn;
+    this.stateInternals = stateInternals;
+    this.windowCoder = windowFn.windowCoder();
+  }
+
+  public Trigger.TriggerContext base(W window, Timers timers,
+      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) {
+    return new TriggerContextImpl(window, timers, rootTrigger, finishedSet);
+  }
+
+  public Trigger.OnElementContext createOnElementContext(
+      W window, Timers timers, Instant elementTimestamp,
+      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) {
+    return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp);
+  }
+
+  public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers,
+      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet,
+      Map<W, FinishedTriggers> finishedSets) {
+    return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets);
+  }
+
+  public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger trigger) {
+    return new StateAccessorImpl(window, trigger);
+  }
+
+  public MergingStateAccessor<?, W> createMergingStateAccessor(
+      W mergeResult, Collection<W> mergingWindows, ExecutableTrigger trigger) {
+    return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult);
+  }
+
+  private class TriggerInfoImpl implements Trigger.TriggerInfo {
+
+    protected final ExecutableTrigger trigger;
+    protected final FinishedTriggers finishedSet;
+    private final Trigger.TriggerContext context;
+
+    public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet,
+        Trigger.TriggerContext context) {
+      this.trigger = trigger;
+      this.finishedSet = finishedSet;
+      this.context = context;
+    }
+
+    @Override
+    public boolean isMerging() {
+      return !windowFn.isNonMerging();
+    }
+
+    @Override
+    public Iterable<ExecutableTrigger> subTriggers() {
+      return trigger.subTriggers();
+    }
+
+    @Override
+    public ExecutableTrigger subTrigger(int subtriggerIndex) {
+      return trigger.subTriggers().get(subtriggerIndex);
+    }
+
+    @Override
+    public boolean isFinished() {
+      return finishedSet.isFinished(trigger);
+    }
+
+    @Override
+    public boolean isFinished(int subtriggerIndex) {
+      return finishedSet.isFinished(subTrigger(subtriggerIndex));
+    }
+
+    @Override
+    public boolean areAllSubtriggersFinished() {
+      return Iterables.isEmpty(unfinishedSubTriggers());
+    }
+
+    @Override
+    public Iterable<ExecutableTrigger> unfinishedSubTriggers() {
+      return FluentIterable
+          .from(trigger.subTriggers())
+          .filter(new Predicate<ExecutableTrigger>() {
+            @Override
+            public boolean apply(ExecutableTrigger trigger) {
+              return !finishedSet.isFinished(trigger);
+            }
+          });
+    }
+
+    @Override
+    public ExecutableTrigger firstUnfinishedSubTrigger() {
+      for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+        if (!finishedSet.isFinished(subTrigger)) {
+          return subTrigger;
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public void resetTree() throws Exception {
+      finishedSet.clearRecursively(trigger);
+      trigger.invokeClear(context);
+    }
+
+    @Override
+    public void setFinished(boolean finished) {
+      finishedSet.setFinished(trigger, finished);
+    }
+
+    @Override
+    public void setFinished(boolean finished, int subTriggerIndex) {
+      finishedSet.setFinished(subTrigger(subTriggerIndex), finished);
+    }
+  }
+
+  private class TriggerTimers implements Timers {
+
+    private final Timers timers;
+    private final W window;
+
+    public TriggerTimers(W window, Timers timers) {
+      this.timers = timers;
+      this.window = window;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+      timers.setTimer(timestamp, timeDomain);
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+      if (timeDomain == TimeDomain.EVENT_TIME
+          && timestamp.equals(window.maxTimestamp())) {
+        // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time
+        // state transitions.
+        return;
+      }
+      timers.deleteTimer(timestamp, timeDomain);
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timers.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timers.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    public Instant currentEventTime() {
+      return timers.currentEventTime();
+    }
+  }
+
+  private class MergingTriggerInfoImpl
+      extends TriggerInfoImpl implements Trigger.MergingTriggerInfo {
+
+    private final Map<W, FinishedTriggers> finishedSets;
+
+    public MergingTriggerInfoImpl(
+        ExecutableTrigger trigger,
+        FinishedTriggers finishedSet,
+        Trigger.TriggerContext context,
+        Map<W, FinishedTriggers> finishedSets) {
+      super(trigger, finishedSet, context);
+      this.finishedSets = finishedSets;
+    }
+
+    @Override
+    public boolean finishedInAnyMergingWindow() {
+      for (FinishedTriggers finishedSet : finishedSets.values()) {
+        if (finishedSet.isFinished(trigger)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public boolean finishedInAllMergingWindows() {
+      for (FinishedTriggers finishedSet : finishedSets.values()) {
+        if (!finishedSet.isFinished(trigger)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  private class StateAccessorImpl implements StateAccessor<Object> {
+    protected final int triggerIndex;
+    protected final StateNamespace windowNamespace;
+
+    public StateAccessorImpl(
+        W window,
+        ExecutableTrigger trigger) {
+      this.triggerIndex = trigger.getTriggerIndex();
+      this.windowNamespace = namespaceFor(window);
+    }
+
+    protected StateNamespace namespaceFor(W window) {
+      return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex);
+    }
+
+    @Override
+    public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
+      return stateInternals.state(windowNamespace, address);
+    }
+  }
+
+  private class MergingStateAccessorImpl extends StateAccessorImpl
+  implements MergingStateAccessor<Object, W> {
+    private final Collection<W> activeToBeMerged;
+
+    public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection<W> activeToBeMerged,
+        W mergeResult) {
+      super(mergeResult, trigger);
+      this.activeToBeMerged = activeToBeMerged;
+    }
+
+    @Override
+    public <StateT extends State> StateT access(
+        StateTag<? super Object, StateT> address) {
+      return stateInternals.state(windowNamespace, address);
+    }
+
+    @Override
+    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
+        StateTag<? super Object, StateT> address) {
+      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
+      for (W mergingWindow : activeToBeMerged) {
+        StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
+        builder.put(mergingWindow, stateForWindow);
+      }
+      return builder.build();
+    }
+  }
+
+  private class TriggerContextImpl extends Trigger.TriggerContext {
+
+    private final W window;
+    private final StateAccessorImpl state;
+    private final Timers timers;
+    private final TriggerInfoImpl triggerInfo;
+
+    private TriggerContextImpl(
+        W window,
+        Timers timers,
+        ExecutableTrigger trigger,
+        FinishedTriggers finishedSet) {
+      trigger.getSpec().super();
+      this.window = window;
+      this.state = new StateAccessorImpl(window, trigger);
+      this.timers = new TriggerTimers(window, timers);
+      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
+    }
+
+    @Override
+    public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) {
+      return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet);
+    }
+
+    @Override
+    public TriggerInfo trigger() {
+      return triggerInfo;
+    }
+
+    @Override
+    public StateAccessor<?> state() {
+      return state;
+    }
+
+    @Override
+    public W window() {
+      return window;
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain domain) {
+      timers.deleteTimer(timestamp, domain);
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timers.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timers.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentEventTime() {
+      return timers.currentEventTime();
+    }
+  }
+
+  private class OnElementContextImpl extends Trigger.OnElementContext {
+
+    private final W window;
+    private final StateAccessorImpl state;
+    private final Timers timers;
+    private final TriggerInfoImpl triggerInfo;
+    private final Instant eventTimestamp;
+
+    private OnElementContextImpl(
+        W window,
+        Timers timers,
+        ExecutableTrigger trigger,
+        FinishedTriggers finishedSet,
+        Instant eventTimestamp) {
+      trigger.getSpec().super();
+      this.window = window;
+      this.state = new StateAccessorImpl(window, trigger);
+      this.timers = new TriggerTimers(window, timers);
+      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
+      this.eventTimestamp = eventTimestamp;
+    }
+
+
+    @Override
+    public Instant eventTimestamp() {
+      return eventTimestamp;
+    }
+
+    @Override
+    public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) {
+      return new OnElementContextImpl(
+          window, timers, trigger, triggerInfo.finishedSet, eventTimestamp);
+    }
+
+    @Override
+    public TriggerInfo trigger() {
+      return triggerInfo;
+    }
+
+    @Override
+    public StateAccessor<?> state() {
+      return state;
+    }
+
+    @Override
+    public W window() {
+      return window;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain domain) {
+      timers.setTimer(timestamp, domain);
+    }
+
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain domain) {
+      timers.deleteTimer(timestamp, domain);
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timers.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timers.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentEventTime() {
+      return timers.currentEventTime();
+    }
+  }
+
+  private class OnMergeContextImpl extends Trigger.OnMergeContext {
+    private final MergingStateAccessor<?, W> state;
+    private final W window;
+    private final Collection<W> mergingWindows;
+    private final Timers timers;
+    private final MergingTriggerInfoImpl triggerInfo;
+
+    private OnMergeContextImpl(
+        W window,
+        Timers timers,
+        ExecutableTrigger trigger,
+        FinishedTriggers finishedSet,
+        Map<W, FinishedTriggers> finishedSets) {
+      trigger.getSpec().super();
+      this.mergingWindows = finishedSets.keySet();
+      this.window = window;
+      this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window);
+      this.timers = new TriggerTimers(window, timers);
+      this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets);
+    }
+
+    @Override
+    public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) {
+      return new OnMergeContextImpl(
+          window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets);
+    }
+
+    @Override
+    public MergingStateAccessor<?, W> state() {
+      return state;
+    }
+
+    @Override
+    public MergingTriggerInfo trigger() {
+      return triggerInfo;
+    }
+
+    @Override
+    public W window() {
+      return window;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain domain) {
+      timers.setTimer(timestamp, domain);
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain domain) {
+      timers.setTimer(timestamp, domain);
+
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timers.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timers.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentEventTime() {
+      return timers.currentEventTime();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java
new file mode 100644
index 0000000..8d0f322
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.util.BitSetCoder;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.apache.beam.sdk.util.FinishedTriggers;
+import org.apache.beam.sdk.util.FinishedTriggersBitSet;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.TriggerContextFactory;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
+
+/**
+ * Executes a trigger while managing persistence of information about which subtriggers are
+ * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
+ *
+ * <p>Specifically, the responsibilities are:
+ *
+ * <ul>
+ *   <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
+ *       constructing the appropriate trigger contexts.</li>
+ *   <li>Committing a record of which subtriggers are finished to persistent state.</li>
+ *   <li>Restoring the record of which subtriggers are finished from persistent state.</li>
+ *   <li>Clearing out the persisted finished set when a caller indicates
+ *       (via {#link #clearFinished}) that it is no longer needed.</li>
+ * </ul>
+ *
+ * <p>These responsibilities are intertwined: trigger contexts include mutable information about
+ * which subtriggers are finished. This class provides the information when building the contexts
+ * and commits the information when the method of the {@link ExecutableTrigger} returns.
+ *
+ * @param <W> The kind of windows being processed.
+ */
+public class TriggerRunner<W extends BoundedWindow> {
+  @VisibleForTesting
+  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+      StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
+
+  private final ExecutableTrigger rootTrigger;
+  private final TriggerContextFactory<W> contextFactory;
+
+  public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
+    checkState(rootTrigger.getTriggerIndex() == 0);
+    this.rootTrigger = rootTrigger;
+    this.contextFactory = contextFactory;
+  }
+
+  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
+    if (!isFinishedSetNeeded()) {
+      // If no trigger in the tree will ever have finished bits, then we don't need to read them.
+      // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
+      // finished) for each trigger in the tree.
+      return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+    }
+
+    BitSet bitSet = state.read();
+    return bitSet == null
+        ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
+            : FinishedTriggersBitSet.fromBitSet(bitSet);
+  }
+
+
+  private void clearFinishedBits(ValueState<BitSet> state) {
+    if (!isFinishedSetNeeded()) {
+      // Nothing to clear.
+      return;
+    }
+    state.clear();
+  }
+
+  /** Return true if the trigger is closed in the window corresponding to the specified state. */
+  public boolean isClosed(StateAccessor<?> state) {
+    return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
+  }
+
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+      justification = "prefetch side effect")
+  public void prefetchForValue(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    rootTrigger.getSpec().prefetchOnElement(
+        contextFactory.createStateAccessor(window, rootTrigger));
+  }
+
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+      justification = "prefetch side effect")
+  public void prefetchOnFire(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
+  }
+
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+      justification = "prefetch side effect")
+  public void prefetchShouldFire(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    rootTrigger.getSpec().prefetchShouldFire(
+        contextFactory.createStateAccessor(window, rootTrigger));
+  }
+
+  /**
+   * Run the trigger logic to deal with a new value.
+   */
+  public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
+      throws Exception {
+    // Clone so that we can detect changes and so that changes here don't pollute merging.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
+        window, timers, timestamp, rootTrigger, finishedSet);
+    rootTrigger.invokeOnElement(triggerContext);
+    persistFinishedSet(state, finishedSet);
+  }
+
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+      justification = "prefetch side effect")
+  public void prefetchForMerge(
+      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
+    if (isFinishedSetNeeded()) {
+      for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
+        value.readLater();
+      }
+    }
+    rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
+        window, mergingWindows, rootTrigger));
+  }
+
+  /**
+   * Run the trigger merging logic as part of executing the specified merge.
+   */
+  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
+    // Clone so that we can detect changes and so that changes here don't pollute merging.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+
+    // And read the finished bits in each merging window.
+    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
+    for (Map.Entry<W, ValueState<BitSet>> entry :
+        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
+      // Don't need to clone these, since the trigger context doesn't allow modification
+      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
+      // Clear the underlying finished bits.
+      clearFinishedBits(entry.getValue());
+    }
+    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
+
+    Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
+        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
+
+    // Run the merge from the trigger
+    rootTrigger.invokeOnMerge(mergeContext);
+
+    persistFinishedSet(state, finishedSet);
+  }
+
+  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    Trigger.TriggerContext context = contextFactory.base(window, timers,
+        rootTrigger, finishedSet);
+    return rootTrigger.invokeShouldFire(context);
+  }
+
+  public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+    // shouldFire should be false.
+    // However it is too expensive to assert.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    Trigger.TriggerContext context = contextFactory.base(window, timers,
+        rootTrigger, finishedSet);
+    rootTrigger.invokeOnFire(context);
+    persistFinishedSet(state, finishedSet);
+  }
+
+  private void persistFinishedSet(
+      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
+    if (!isFinishedSetNeeded()) {
+      return;
+    }
+
+    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
+    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
+      if (modifiedFinishedSet.getBitSet().isEmpty()) {
+        finishedSetState.clear();
+      } else {
+        finishedSetState.write(modifiedFinishedSet.getBitSet());
+      }
+    }
+  }
+
+  /**
+   * Clear the finished bits.
+   */
+  public void clearFinished(StateAccessor<?> state) {
+    clearFinishedBits(state.access(FINISHED_BITS_TAG));
+  }
+
+  /**
+   * Clear the state used for executing triggers, but leave the finished set to indicate
+   * the window is closed.
+   */
+  public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
+    // Don't need to clone, because we'll be clearing the finished bits anyways.
+    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
+    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
+  }
+
+  private boolean isFinishedSetNeeded() {
+    // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
+    // lookup. Right now, we special case this for the DefaultTrigger.
+    return !(rootTrigger.getSpec() instanceof DefaultTrigger);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
new file mode 100644
index 0000000..b591229
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link AfterAll}.
+ */
+@RunWith(JUnit4.class)
+public class AfterAllTest {
+
+  private SimpleTriggerTester<IntervalWindow> tester;
+
+  @Test
+  public void testT1FiresFirst() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterAll.of(
+            AfterPane.elementCountAtLeast(1),
+            AfterPane.elementCountAtLeast(2)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testT2FiresFirst() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterAll.of(
+            AfterPane.elementCountAtLeast(2),
+            AfterPane.elementCountAtLeast(1)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that the AfterAll properly unsets finished bits when a merge causing it to become
+   * unfinished.
+   */
+  @Test
+  public void testOnMergeRewinds() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            AfterAll.of(
+                AfterWatermark.pastEndOfWindow(),
+                AfterPane.elementCountAtLeast(1)),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+
+    // Finish the AfterAll in the first window
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Merge them; the AfterAll should not be finished
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+    assertFalse(tester.isMarkedFinished(mergedWindow));
+
+    // Confirm that we are back on the first trigger by probing that it is not ready to fire
+    // after an element (with merging)
+    tester.injectElements(3);
+    tester.mergeWindows();
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Fire the AfterAll in the merged window
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+    tester.fireIfShouldFire(mergedWindow);
+
+    // Confirm that we are on the second trigger by probing
+    tester.injectElements(2);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+    tester.fireIfShouldFire(mergedWindow);
+    tester.injectElements(2);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+    tester.fireIfShouldFire(mergedWindow);
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        AfterAll.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(1))
+            .getWatermarkThatGuaranteesFiring(window));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
+    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
+    Trigger afterAll = AfterAll.of(trigger1, trigger2);
+    assertEquals(
+        AfterAll.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()),
+        afterAll.getContinuationTrigger());
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = AfterAll.of(StubTrigger.named("t1"), StubTrigger.named("t2"));
+    assertEquals("AfterAll.of(t1, t2)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
new file mode 100644
index 0000000..c413c6e
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link AfterEach}.
+ */
+@RunWith(JUnit4.class)
+public class AfterEachTest {
+
+  private SimpleTriggerTester<IntervalWindow> tester;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  /**
+   * Tests that the {@link AfterEach} trigger fires and finishes the first trigger then the second.
+   */
+  @Test
+  public void testAfterEachInSequence() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(2))
+                .orFinally(AfterPane.elementCountAtLeast(3)),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(5))
+                .orFinally(AfterWatermark.pastEndOfWindow())),
+            FixedWindows.of(Duration.millis(10)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+    // AfterCount(2) not ready
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    // AfterCount(2) ready, not finished
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // orFinally(AfterCount(3)) ready and will finish the first
+    tester.injectElements(1, 2, 3);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // Now running as the second trigger
+    assertFalse(tester.shouldFire(window));
+    // This quantity of elements would fire and finish if it were erroneously still the first
+    tester.injectElements(1, 2, 3, 4);
+    assertFalse(tester.shouldFire(window));
+
+    // Now fire once
+    tester.injectElements(5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // This time advance the watermark to finish the whole mess.
+    tester.advanceInputWatermark(new Instant(10));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+    assertEquals(new Instant(9),
+        AfterEach.inOrder(AfterWatermark.pastEndOfWindow(),
+                          AfterPane.elementCountAtLeast(4))
+            .getWatermarkThatGuaranteesFiring(window));
+
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        AfterEach.inOrder(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow())
+            .getWatermarkThatGuaranteesFiring(window));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
+    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
+    Trigger afterEach = AfterEach.inOrder(trigger1, trigger2);
+    assertEquals(
+        Repeatedly.forever(AfterFirst.of(
+            trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())),
+        afterEach.getContinuationTrigger());
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = AfterEach.inOrder(
+        StubTrigger.named("t1"),
+        StubTrigger.named("t2"),
+        StubTrigger.named("t3"));
+
+    assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
new file mode 100644
index 0000000..415060b
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link AfterFirst}.
+ */
+@RunWith(JUnit4.class)
+public class AfterFirstTest {
+
+  @Mock private OnceTrigger mockTrigger1;
+  @Mock private OnceTrigger mockTrigger2;
+  private SimpleTriggerTester<IntervalWindow> tester;
+  private static Trigger.TriggerContext anyTriggerContext() {
+    return Mockito.<Trigger.TriggerContext>any();
+  }
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testNeitherShouldFireFixedWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
+
+    tester.injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
+
+    assertFalse(tester.shouldFire(window)); // should not fire
+    assertFalse(tester.isMarkedFinished(window)); // not finished
+  }
+
+  @Test
+  public void testOnlyT1ShouldFireFixedWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
+
+    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
+
+    assertTrue(tester.shouldFire(window)); // should fire
+
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testOnlyT2ShouldFireFixedWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+    AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
+
+    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(window)); // should fire
+
+    tester.fireIfShouldFire(window); // now finished
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testBothShouldFireFixedWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+    AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
+
+    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(window)); // should fire
+
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that if the first trigger rewinds to be non-finished in the merged window,
+   * then it becomes the currently active trigger again, with real triggers.
+   */
+  @Test
+  public void testShouldFireAfterMerge() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            AfterFirst.of(AfterPane.elementCountAtLeast(5),
+                AfterWatermark.pastEndOfWindow()),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    // Finished the AfterFirst in the first window
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Set up second window where it is not done
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Merge them, if the merged window were on the second trigger, it would be ready
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Now adding 3 more makes the AfterFirst ready to fire
+    tester.injectElements(1, 2, 3, 4, 5);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+    assertEquals(new Instant(9),
+        AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(4))
+            .getWatermarkThatGuaranteesFiring(window));
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterPane.elementCountAtLeast(1))
+            .getWatermarkThatGuaranteesFiring(window));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
+    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
+    Trigger afterFirst = AfterFirst.of(trigger1, trigger2);
+    assertEquals(
+        AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()),
+        afterFirst.getContinuationTrigger());
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), StubTrigger.named("t2"));
+    assertEquals("AfterFirst.of(t1, t2)", trigger.toString());
+  }
+}


Mime
View raw message