beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/9] beam git commit: Rename Triggers to TriggerTranslation
Date Wed, 24 May 2017 20:14:10 GMT
Repository: beam
Updated Branches:
  refs/heads/master 6418bcfcb -> 0637df1bc


Rename Triggers to TriggerTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fa38e2d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fa38e2d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fa38e2d

Branch: refs/heads/master
Commit: 4fa38e2d590cc4472c119b57790f7a724a700e43
Parents: c8b2119
Author: Kenneth Knowles <klk@google.com>
Authored: Tue May 23 15:33:33 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue May 23 15:53:41 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |   4 +-
 .../core/construction/TriggerTranslation.java   | 336 +++++++++++++++++++
 .../runners/core/construction/Triggers.java     | 336 -------------------
 .../WindowingStrategyTranslation.java           |   4 +-
 .../construction/TriggerTranslationTest.java    | 112 +++++++
 .../runners/core/construction/TriggersTest.java | 111 ------
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   4 +-
 .../beam/runners/core/ReduceFnTester.java       |  10 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   4 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   4 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   4 +-
 11 files changed, 466 insertions(+), 463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 1d48e20..39f681f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -41,7 +41,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.coders.Coder;
@@ -163,7 +163,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
         windowingStrategy,
         ExecutableTriggerStateMachine.create(
             TriggerStateMachines.stateMachineForTrigger(
-                Triggers.toProto(windowingStrategy.getTrigger()))),
+                TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
         stateInternalsFactory.stateInternalsForKey(key),
         timerInternals,
         new OutputWindowedValue<KV<K, Iterable<V>>>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java
new file mode 100644
index 0000000..777b165
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** Utilities for working with {@link TriggerTranslation Triggers}. */
+@Experimental(Experimental.Kind.TRIGGER)
+public class TriggerTranslation implements Serializable {
+
+  @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();
+
+  public static RunnerApi.Trigger toProto(Trigger trigger) {
+    return CONVERTER.convertTrigger(trigger);
+  }
+
+  @VisibleForTesting
+  static class ProtoConverter {
+
+    public RunnerApi.Trigger convertTrigger(Trigger trigger) {
+      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+      return tryConvert(evaluationMethod, trigger);
+    }
+
+    private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) {
+      try {
+        return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
+      } catch (InvocationTargetException exc) {
+        if (exc.getCause() instanceof RuntimeException) {
+          throw (RuntimeException) exc.getCause();
+        } else {
+          throw new RuntimeException(exc.getCause());
+        }
+      } catch (IllegalAccessException exc) {
+        throw new IllegalStateException(
+            String.format("Internal error: could not invoke %s", evaluationMethod));
+      }
+    }
+
+    private Method getEvaluationMethod(Class<?> clazz) {
+      try {
+        return getClass().getDeclaredMethod("convertSpecific", clazz);
+      } catch (NoSuchMethodException exc) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot translate trigger class %s to a runner-API proto.",
+                clazz.getCanonicalName()),
+            exc);
+      }
+    }
+
+    private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAfterSynchronizedProcessingTime(
+              RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return RunnerApi.TimeDomain.EVENT_TIME;
+        case PROCESSING_TIME:
+          return RunnerApi.TimeDomain.PROCESSING_TIME;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
+        default:
+          throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
+      }
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterFirst v) {
+      RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterAll v) {
+      RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterPane v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setElementCount(
+              RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) {
+      RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
+          RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
+
+      builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
+      if (v.getLateTrigger() != null) {
+        builder.setLateFirings(toProto(v.getLateTrigger()));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterEach v) {
+      RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(Repeatedly v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setRepeat(
+              RunnerApi.Trigger.Repeat.newBuilder()
+                  .setSubtrigger(toProto(v.getRepeatedTrigger())))
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setOrFinally(
+              RunnerApi.Trigger.OrFinally.newBuilder()
+                  .setMain(toProto(v.getMainTrigger()))
+                  .setFinally(toProto(v.getUntilTrigger())))
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
+      RunnerApi.Trigger.AfterProcessingTime.Builder builder =
+          RunnerApi.Trigger.AfterProcessingTime.newBuilder();
+
+      for (TimestampTransform transform : v.getTimestampTransforms()) {
+        builder.addTimestampTransforms(convertTimestampTransform(transform));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
+    }
+
+    private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) {
+      if (transform instanceof TimestampTransform.Delay) {
+        return RunnerApi.TimestampTransform.newBuilder()
+            .setDelay(
+                RunnerApi.TimestampTransform.Delay.newBuilder()
+                    .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis()))
+            .build();
+      } else if (transform instanceof TimestampTransform.AlignTo) {
+        TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform;
+        return RunnerApi.TimestampTransform.newBuilder()
+            .setAlignTo(
+                RunnerApi.TimestampTransform.AlignTo.newBuilder()
+                    .setPeriod(alignTo.getPeriod().getMillis())
+                    .setOffset(alignTo.getOffset().getMillis()))
+            .build();
+
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform));
+      }
+    }
+  }
+
+  public static Trigger fromProto(RunnerApi.Trigger triggerProto) {
+    switch (triggerProto.getTriggerCase()) {
+      case AFTER_ALL:
+        return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList()));
+      case AFTER_ANY:
+        return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList()));
+      case AFTER_EACH:
+        return AfterEach.inOrder(
+            protosToTriggers(triggerProto.getAfterEach().getSubtriggersList()));
+      case AFTER_END_OF_WINDOW:
+        RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow();
+
+        if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) {
+          return AfterWatermark.pastEndOfWindow();
+        }
+
+        // It either has early or late firings or both; our typing in Java makes this a smidge
+        // annoying
+        if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) {
+          AfterWatermarkEarlyAndLate trigger =
+              AfterWatermark.pastEndOfWindow()
+                  .withEarlyFirings(
+                      (OnceTrigger)
+                          fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings()));
+
+          if (triggerProto.getAfterEndOfWindow().hasLateFirings()) {
+            trigger =
+                trigger.withLateFirings(
+                    (OnceTrigger)
+                        fromProto(triggerProto.getAfterEndOfWindow().getLateFirings()));
+          }
+          return trigger;
+        } else {
+          // only late firings, so return directly
+          return AfterWatermark.pastEndOfWindow()
+              .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings()));
+        }
+      case AFTER_PROCESSING_TIME:
+        AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane();
+        for (RunnerApi.TimestampTransform transform :
+            triggerProto.getAfterProcessingTime().getTimestampTransformsList()) {
+          switch (transform.getTimestampTransformCase()) {
+            case ALIGN_TO:
+              trigger =
+                  trigger.alignedTo(
+                      Duration.millis(transform.getAlignTo().getPeriod()),
+                      new Instant(transform.getAlignTo().getOffset()));
+              break;
+            case DELAY:
+              trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis()));
+              break;
+            case TIMESTAMPTRANSFORM_NOT_SET:
+              throw new IllegalArgumentException(
+                  String.format(
+                      "Required field 'timestamp_transform' not set in %s", transform));
+            default:
+              throw new IllegalArgumentException(
+                  String.format(
+                      "Unknown timestamp transform case: %s",
+                      transform.getTimestampTransformCase()));
+          }
+        }
+        return trigger;
+      case AFTER_SYNCHRONIZED_PROCESSING_TIME:
+        return AfterSynchronizedProcessingTime.ofFirstElement();
+      case ALWAYS:
+        return new ReshuffleTrigger();
+      case ELEMENT_COUNT:
+        return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount());
+      case NEVER:
+        return Never.ever();
+      case OR_FINALLY:
+        return fromProto(triggerProto.getOrFinally().getMain())
+            .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally()));
+      case REPEAT:
+        return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger()));
+      case DEFAULT:
+        return DefaultTrigger.of();
+      case TRIGGER_NOT_SET:
+        throw new IllegalArgumentException(
+            String.format("Required field 'trigger' not set in %s", triggerProto));
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown trigger case: %s", triggerProto.getTriggerCase()));
+    }
+  }
+
+  private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) {
+    List<Trigger> result = Lists.newArrayList();
+    for (RunnerApi.Trigger trigger : triggers) {
+      result.add(fromProto(trigger));
+    }
+    return result;
+  }
+
+  // Do not instantiate
+  private TriggerTranslation() {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
deleted file mode 100644
index df6c9ed..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
-import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
-import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/** Utilities for working with {@link Triggers Triggers}. */
-@Experimental(Experimental.Kind.TRIGGER)
-public class Triggers implements Serializable {
-
-  @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();
-
-  public static RunnerApi.Trigger toProto(Trigger trigger) {
-    return CONVERTER.convertTrigger(trigger);
-  }
-
-  @VisibleForTesting
-  static class ProtoConverter {
-
-    public RunnerApi.Trigger convertTrigger(Trigger trigger) {
-      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
-      return tryConvert(evaluationMethod, trigger);
-    }
-
-    private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) {
-      try {
-        return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
-      } catch (InvocationTargetException exc) {
-        if (exc.getCause() instanceof RuntimeException) {
-          throw (RuntimeException) exc.getCause();
-        } else {
-          throw new RuntimeException(exc.getCause());
-        }
-      } catch (IllegalAccessException exc) {
-        throw new IllegalStateException(
-            String.format("Internal error: could not invoke %s", evaluationMethod));
-      }
-    }
-
-    private Method getEvaluationMethod(Class<?> clazz) {
-      try {
-        return getClass().getDeclaredMethod("convertSpecific", clazz);
-      } catch (NoSuchMethodException exc) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot translate trigger class %s to a runner-API proto.",
-                clazz.getCanonicalName()),
-            exc);
-      }
-    }
-
-    private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder())
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setAfterSynchronizedProcessingTime(
-              RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
-          .build();
-    }
-
-    private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) {
-      switch (timeDomain) {
-        case EVENT_TIME:
-          return RunnerApi.TimeDomain.EVENT_TIME;
-        case PROCESSING_TIME:
-          return RunnerApi.TimeDomain.PROCESSING_TIME;
-        case SYNCHRONIZED_PROCESSING_TIME:
-          return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
-        default:
-          throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
-      }
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterFirst v) {
-      RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder();
-
-      for (Trigger subtrigger : v.subTriggers()) {
-        builder.addSubtriggers(toProto(subtrigger));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterAll v) {
-      RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder();
-
-      for (Trigger subtrigger : v.subTriggers()) {
-        builder.addSubtriggers(toProto(subtrigger));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterPane v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setElementCount(
-              RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) {
-      RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
-          RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
-
-      builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
-      if (v.getLateTrigger() != null) {
-        builder.setLateFirings(toProto(v.getLateTrigger()));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterEach v) {
-      RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder();
-
-      for (Trigger subtrigger : v.subTriggers()) {
-        builder.addSubtriggers(toProto(subtrigger));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(Repeatedly v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setRepeat(
-              RunnerApi.Trigger.Repeat.newBuilder()
-                  .setSubtrigger(toProto(v.getRepeatedTrigger())))
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setOrFinally(
-              RunnerApi.Trigger.OrFinally.newBuilder()
-                  .setMain(toProto(v.getMainTrigger()))
-                  .setFinally(toProto(v.getUntilTrigger())))
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
-      RunnerApi.Trigger.AfterProcessingTime.Builder builder =
-          RunnerApi.Trigger.AfterProcessingTime.newBuilder();
-
-      for (TimestampTransform transform : v.getTimestampTransforms()) {
-        builder.addTimestampTransforms(convertTimestampTransform(transform));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
-    }
-
-    private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) {
-      if (transform instanceof TimestampTransform.Delay) {
-        return RunnerApi.TimestampTransform.newBuilder()
-            .setDelay(
-                RunnerApi.TimestampTransform.Delay.newBuilder()
-                    .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis()))
-            .build();
-      } else if (transform instanceof TimestampTransform.AlignTo) {
-        TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform;
-        return RunnerApi.TimestampTransform.newBuilder()
-            .setAlignTo(
-                RunnerApi.TimestampTransform.AlignTo.newBuilder()
-                    .setPeriod(alignTo.getPeriod().getMillis())
-                    .setOffset(alignTo.getOffset().getMillis()))
-            .build();
-
-      } else {
-        throw new IllegalArgumentException(
-            String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform));
-      }
-    }
-  }
-
-  public static Trigger fromProto(RunnerApi.Trigger triggerProto) {
-    switch (triggerProto.getTriggerCase()) {
-      case AFTER_ALL:
-        return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList()));
-      case AFTER_ANY:
-        return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList()));
-      case AFTER_EACH:
-        return AfterEach.inOrder(
-            protosToTriggers(triggerProto.getAfterEach().getSubtriggersList()));
-      case AFTER_END_OF_WINDOW:
-        RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow();
-
-        if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) {
-          return AfterWatermark.pastEndOfWindow();
-        }
-
-        // It either has early or late firings or both; our typing in Java makes this a smidge
-        // annoying
-        if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) {
-          AfterWatermarkEarlyAndLate trigger =
-              AfterWatermark.pastEndOfWindow()
-                  .withEarlyFirings(
-                      (OnceTrigger)
-                          fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings()));
-
-          if (triggerProto.getAfterEndOfWindow().hasLateFirings()) {
-            trigger =
-                trigger.withLateFirings(
-                    (OnceTrigger)
-                        fromProto(triggerProto.getAfterEndOfWindow().getLateFirings()));
-          }
-          return trigger;
-        } else {
-          // only late firings, so return directly
-          return AfterWatermark.pastEndOfWindow()
-              .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings()));
-        }
-      case AFTER_PROCESSING_TIME:
-        AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane();
-        for (RunnerApi.TimestampTransform transform :
-            triggerProto.getAfterProcessingTime().getTimestampTransformsList()) {
-          switch (transform.getTimestampTransformCase()) {
-            case ALIGN_TO:
-              trigger =
-                  trigger.alignedTo(
-                      Duration.millis(transform.getAlignTo().getPeriod()),
-                      new Instant(transform.getAlignTo().getOffset()));
-              break;
-            case DELAY:
-              trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis()));
-              break;
-            case TIMESTAMPTRANSFORM_NOT_SET:
-              throw new IllegalArgumentException(
-                  String.format(
-                      "Required field 'timestamp_transform' not set in %s", transform));
-            default:
-              throw new IllegalArgumentException(
-                  String.format(
-                      "Unknown timestamp transform case: %s",
-                      transform.getTimestampTransformCase()));
-          }
-        }
-        return trigger;
-      case AFTER_SYNCHRONIZED_PROCESSING_TIME:
-        return AfterSynchronizedProcessingTime.ofFirstElement();
-      case ALWAYS:
-        return new ReshuffleTrigger();
-      case ELEMENT_COUNT:
-        return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount());
-      case NEVER:
-        return Never.ever();
-      case OR_FINALLY:
-        return fromProto(triggerProto.getOrFinally().getMain())
-            .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally()));
-      case REPEAT:
-        return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger()));
-      case DEFAULT:
-        return DefaultTrigger.of();
-      case TRIGGER_NOT_SET:
-        throw new IllegalArgumentException(
-            String.format("Required field 'trigger' not set in %s", triggerProto));
-      default:
-        throw new IllegalArgumentException(
-            String.format("Unknown trigger case: %s", triggerProto.getTriggerCase()));
-    }
-  }
-
-  private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) {
-    List<Trigger> result = Lists.newArrayList();
-    for (RunnerApi.Trigger trigger : triggers) {
-      result.add(fromProto(trigger));
-    }
-    return result;
-  }
-
-  // Do not instantiate
-  private Triggers() {}
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 061f309..e92565f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -210,7 +210,7 @@ public class WindowingStrategyTranslation implements Serializable {
             .setAccumulationMode(toProto(windowingStrategy.getMode()))
             .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
             .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
-            .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
+            .setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger()))
             .setWindowFn(windowFnSpec)
             .setWindowCoderId(
                 components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
@@ -247,7 +247,7 @@ public class WindowingStrategyTranslation implements Serializable {
     WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec);
     TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
     AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
-    Trigger trigger = Triggers.fromProto(proto.getTrigger());
+    Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger());
     ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
     Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggerTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggerTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggerTranslationTest.java
new file mode 100644
index 0000000..55ea87b
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggerTranslationTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for utilities in {@link TriggerTranslation}. */
+@RunWith(Parameterized.class)
+public class TriggerTranslationTest {
+
+  @AutoValue
+  abstract static class ToProtoAndBackSpec {
+    abstract Trigger getTrigger();
+  }
+
+  private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) {
+    return new AutoValue_TriggerTranslationTest_ToProtoAndBackSpec(trigger);
+  }
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ToProtoAndBackSpec> data() {
+    return ImmutableList.of(
+        // Atomic triggers
+        toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
+        toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
+        toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
+        toProtoAndBackSpec(Never.ever()),
+        toProtoAndBackSpec(DefaultTrigger.of()),
+        toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
+        toProtoAndBackSpec(
+            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
+        toProtoAndBackSpec(
+            AfterProcessingTime.pastFirstElementInPane()
+                .alignedTo(Duration.millis(5), new Instant(27))),
+        toProtoAndBackSpec(
+            AfterProcessingTime.pastFirstElementInPane()
+                .plusDelayOf(Duration.standardSeconds(3))
+                .alignedTo(Duration.millis(5), new Instant(27))
+                .plusDelayOf(Duration.millis(13))),
+
+        // Composite triggers
+
+        toProtoAndBackSpec(
+            AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())),
+        toProtoAndBackSpec(
+            AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
+                .withLateFirings(AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
+        toProtoAndBackSpec(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+                .orFinally(AfterWatermark.pastEndOfWindow())));
+  }
+
+  @Parameter(0)
+  public ToProtoAndBackSpec toProtoAndBackSpec;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    Trigger trigger = toProtoAndBackSpec.getTrigger();
+    Trigger toProtoAndBackTrigger =
+        TriggerTranslation.fromProto(TriggerTranslation.toProto(trigger));
+
+    assertThat(toProtoAndBackTrigger, equalTo(trigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
deleted file mode 100644
index cf9d40c..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for utilities in {@link Triggers}. */
-@RunWith(Parameterized.class)
-public class TriggersTest {
-
-  @AutoValue
-  abstract static class ToProtoAndBackSpec {
-    abstract Trigger getTrigger();
-  }
-
-  private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) {
-    return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger);
-  }
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<ToProtoAndBackSpec> data() {
-    return ImmutableList.of(
-        // Atomic triggers
-        toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
-        toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
-        toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
-        toProtoAndBackSpec(Never.ever()),
-        toProtoAndBackSpec(DefaultTrigger.of()),
-        toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
-        toProtoAndBackSpec(
-            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
-        toProtoAndBackSpec(
-            AfterProcessingTime.pastFirstElementInPane()
-                .alignedTo(Duration.millis(5), new Instant(27))),
-        toProtoAndBackSpec(
-            AfterProcessingTime.pastFirstElementInPane()
-                .plusDelayOf(Duration.standardSeconds(3))
-                .alignedTo(Duration.millis(5), new Instant(27))
-                .plusDelayOf(Duration.millis(13))),
-
-        // Composite triggers
-
-        toProtoAndBackSpec(
-            AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())),
-        toProtoAndBackSpec(
-            AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(
-            AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(
-            AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(
-            AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(
-            AfterWatermark.pastEndOfWindow()
-                .withEarlyFirings(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
-                .withLateFirings(AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
-        toProtoAndBackSpec(
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))
-                .orFinally(AfterWatermark.pastEndOfWindow())));
-  }
-
-  @Parameter(0)
-  public ToProtoAndBackSpec toProtoAndBackSpec;
-
-  @Test
-  public void testToProtoAndBack() throws Exception {
-    Trigger trigger = toProtoAndBackSpec.getTrigger();
-    Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger));
-
-    assertThat(toProtoAndBackTrigger, equalTo(trigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 744d162..0a520bd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.core;
 
 import java.util.Collection;
-import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -122,7 +122,7 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
             windowingStrategy,
             ExecutableTriggerStateMachine.create(
                 TriggerStateMachines.stateMachineForTrigger(
-                    Triggers.toProto(windowingStrategy.getTrigger()))),
+                    TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
             stateInternals,
             timerInternals,
             outputWindowedValue(),

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 7de8f3b..7f83eae 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -38,7 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
@@ -116,7 +116,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     return new ReduceFnTester<Integer, Iterable<Integer>, W>(
         windowingStrategy,
         TriggerStateMachines.stateMachineForTrigger(
-            Triggers.toProto(windowingStrategy.getTrigger())),
+            TriggerTranslation.toProto(windowingStrategy.getTrigger())),
         SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
         IterableCoder.of(VarIntCoder.of()),
         PipelineOptionsFactory.create(),
@@ -179,7 +179,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     return combining(
         strategy,
-        TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())),
+        TriggerStateMachines.stateMachineForTrigger(
+            TriggerTranslation.toProto(strategy.getTrigger())),
         combineFn,
         outputCoder);
   }
@@ -227,7 +228,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     return combining(
         strategy,
-        TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())),
+        TriggerStateMachines.stateMachineForTrigger(
+            TriggerTranslation.toProto(strategy.getTrigger())),
         combineFn,
         outputCoder,
         options,

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 1a588ee..a944e75 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
-import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
@@ -162,7 +162,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
           (CopyOnAccessInMemoryStateInternals) stepContext.stateInternals();
       DirectTimerInternals timerInternals = stepContext.timerInternals();
       RunnerApi.Trigger runnerApiTrigger =
-          Triggers.toProto(windowingStrategy.getTrigger());
+          TriggerTranslation.toProto(windowingStrategy.getTrigger());
       ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
           new ReduceFnRunner<>(
               key,

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 815b6ba..be4f3f6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -31,7 +31,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
-import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.metrics.CounterCell;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
@@ -260,7 +260,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                               windowingStrategy,
                               ExecutableTriggerStateMachine.create(
                                   TriggerStateMachines.stateMachineForTrigger(
-                                      Triggers.toProto(windowingStrategy.getTrigger()))),
+                                      TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
                               stateInternals,
                               timerInternals,
                               outputHolder,

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index be02335..d2a3424 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -30,7 +30,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
-import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
@@ -92,7 +92,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
             windowingStrategy,
             ExecutableTriggerStateMachine.create(
                 TriggerStateMachines.stateMachineForTrigger(
-                    Triggers.toProto(windowingStrategy.getTrigger()))),
+                    TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
             stateInternals,
             timerInternals,
             outputter,


Mime
View raw message