beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [3/6] incubator-beam git commit: Add DoFnSignatures analysis of timer declarations
Date Thu, 20 Oct 2016 01:02:12 GMT
Add DoFnSignatures analysis of timer declarations


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ccefc6f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ccefc6f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ccefc6f2

Branch: refs/heads/master
Commit: ccefc6f22050b0da1471e2b68a0787e9a861639c
Parents: 1b780c2
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Oct 13 19:53:31 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Oct 19 17:52:21 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/reflect/DoFnSignature.java   |  45 ++++
 .../sdk/transforms/reflect/DoFnSignatures.java  |  95 +++++++-
 .../transforms/reflect/DoFnSignaturesTest.java  | 238 +++++++++++++++++++
 3 files changed, 376 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ccefc6f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 5e261a4..e54b361 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -70,6 +71,9 @@ public abstract class DoFnSignature {
   @Nullable
   public abstract LifecycleMethod teardown();
 
+  /** Timer declarations present on the {@link DoFn} class. Immutable. */
+  public abstract Map<String, TimerDeclaration> timerDeclarations();
+
   /** Details about this {@link DoFn}'s {@link DoFn.GetInitialRestriction} method. */
   @Nullable
   public abstract GetInitialRestrictionMethod getInitialRestriction();
@@ -86,6 +90,10 @@ public abstract class DoFnSignature {
   @Nullable
   public abstract NewTrackerMethod newTracker();
 
+  /** Details about this {@link DoFn}'s {@link DoFn.OnTimer} methods. */
+  @Nullable
+  public abstract Map<String, OnTimerMethod> onTimerMethods();
+
   static Builder builder() {
     return new AutoValue_DoFnSignature.Builder();
   }
@@ -104,6 +112,8 @@ public abstract class DoFnSignature {
     abstract Builder setGetRestrictionCoder(GetRestrictionCoderMethod getRestrictionCoder);
     abstract Builder setNewTracker(NewTrackerMethod newTracker);
     abstract Builder setStateDeclarations(Map<String, StateDeclaration> stateDeclarations);
+    abstract Builder setTimerDeclarations(Map<String, TimerDeclaration> timerDeclarations);
+    abstract Builder setOnTimerMethods(Map<String, OnTimerMethod> onTimerMethods);
     abstract DoFnSignature build();
   }
 
@@ -160,6 +170,41 @@ public abstract class DoFnSignature {
     }
   }
 
+  /** Describes a {@link DoFn.OnTimer} method. */
+  @AutoValue
+  public abstract static class OnTimerMethod implements DoFnMethod {
+
+    /** The id on the method's {@link DoFn.TimerId} annotation. */
+    public abstract String id();
+
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Types of optional parameters of the annotated method, in the order they appear. */
+    public abstract List<Parameter> extraParameters();
+
+    static OnTimerMethod create(Method targetMethod, String id, List<Parameter> extraParameters)
{
+      return new AutoValue_DoFnSignature_OnTimerMethod(
+          id, targetMethod, Collections.unmodifiableList(extraParameters));
+    }
+  }
+
+  /**
+   * Describes a timer declaration; a field of type {@link TimerSpec} annotated with
+   * {@DoFn.TimerId}.
+   */
+  @AutoValue
+  public abstract static class TimerDeclaration {
+    public abstract String id();
+    public abstract Field field();
+
+    static TimerDeclaration create(String id, Field field) {
+      return new AutoValue_DoFnSignature_TimerDeclaration(id, field);
+    }
+  }
+
+
   /** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
   @AutoValue
   public abstract static class BundleMethod implements DoFnMethod {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ccefc6f2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index b87254e..9611dd4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.reflect.TypeParameter;
 import com.google.common.reflect.TypeToken;
 import java.lang.annotation.Annotation;
@@ -33,6 +34,7 @@ import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -42,8 +44,12 @@ import javax.annotation.Nullable;
 import javax.swing.plaf.nimbus.State;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
@@ -96,6 +102,14 @@ public class DoFnSignatures {
     }
     errors.checkNotNull(inputT, "Unable to determine input type");
 
+    // Find the state and timer declarations in advance of validating
+    // method parameter lists
+    Map<String, StateDeclaration> stateDeclarations = analyzeStateDeclarations(errors,
fnClass);
+    builder.setStateDeclarations(stateDeclarations);
+
+    Map<String, TimerDeclaration> timerDeclarations = analyzeTimerDeclarations(errors,
fnClass);
+    builder.setTimerDeclarations(timerDeclarations);
+
     Method processElementMethod =
         findAnnotatedMethod(errors, DoFn.ProcessElement.class, fnClass, true);
     Method startBundleMethod = findAnnotatedMethod(errors, DoFn.StartBundle.class, fnClass,
false);
@@ -112,6 +126,41 @@ public class DoFnSignatures {
         findAnnotatedMethod(errors, DoFn.GetRestrictionCoder.class, fnClass, false);
     Method newTrackerMethod = findAnnotatedMethod(errors, DoFn.NewTracker.class, fnClass,
false);
 
+    Collection<Method> onTimerMethods =
+        declaredMethodsWithAnnotation(DoFn.OnTimer.class, fnClass, DoFn.class);
+    HashMap<String, DoFnSignature.OnTimerMethod> onTimerMethodMap =
+            Maps.newHashMapWithExpectedSize(onTimerMethods.size());
+    for (Method onTimerMethod : onTimerMethods) {
+      String id = onTimerMethod.getAnnotation(DoFn.OnTimer.class).value();
+      errors.checkArgument(
+          timerDeclarations.containsKey(id),
+          "Callback %s is for for undeclared timer %s",
+          onTimerMethod,
+          id);
+
+      TimerDeclaration timerDecl = timerDeclarations.get(id);
+      errors.checkArgument(
+          timerDecl.field().getDeclaringClass().equals(onTimerMethod.getDeclaringClass()),
+          "Callback %s is for timer %s declared in a different class %s."
+              + " Timer callbacks must be declared in the same lexical scope as their timer",
+          onTimerMethod,
+          id,
+          timerDecl.field().getDeclaringClass().getCanonicalName());
+
+      onTimerMethodMap.put(id, OnTimerMethod.create(onTimerMethod, id, Collections.EMPTY_LIST));
+    }
+    builder.setOnTimerMethods(onTimerMethodMap);
+
+    // Check the converse - that all timers have a callback. This could be relaxed to only
+    // those timers used in methods, once method parameter lists support timers.
+    for (TimerDeclaration decl : timerDeclarations.values()) {
+      errors.checkArgument(
+          onTimerMethodMap.containsKey(decl.id()),
+          "No callback registered via %s for timer %s",
+          DoFn.OnTimer.class.getSimpleName(),
+          decl.id());
+    }
+
     ErrorReporter processElementErrors =
         errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
     DoFnSignature.ProcessElementMethod processElement =
@@ -183,8 +232,6 @@ public class DoFnSignatures {
 
     builder.setIsBoundedPerElement(inferBoundedness(fnToken, processElement, errors));
 
-    builder.setStateDeclarations(analyzeStateDeclarations(errors, fnClass));
-
     DoFnSignature signature = builder.build();
 
     // Additional validation for splittable DoFn's.
@@ -556,6 +603,50 @@ public class DoFnSignatures {
     return DoFnSignature.SplitRestrictionMethod.create(m, restrictionT);
   }
 
+  private static ImmutableMap<String, TimerDeclaration> analyzeTimerDeclarations(
+      ErrorReporter errors, Class<?> fnClazz) {
+    Map<String, DoFnSignature.TimerDeclaration> declarations = new HashMap<>();
+    for (Field field : declaredFieldsWithAnnotation(DoFn.TimerId.class, fnClazz, DoFn.class))
{
+      String id = field.getAnnotation(DoFn.TimerId.class).value();
+      validateTimerField(errors, declarations, id, field);
+      declarations.put(id, DoFnSignature.TimerDeclaration.create(id, field));
+    }
+
+    return ImmutableMap.copyOf(declarations);
+  }
+
+  /**
+   * Returns successfully if the field is valid, otherwise throws an exception via
+   * its {@link ErrorReporter} parameter describing validation failures for the
+   * timer declaration.
+   */
+  private static void validateTimerField(
+      ErrorReporter errors, Map<String, TimerDeclaration> declarations, String id,
Field field) {
+    if (declarations.containsKey(id)) {
+      errors.throwIllegalArgument(
+          "Duplicate %s \"%s\", used on both of [%s] and [%s]",
+          DoFn.TimerId.class.getSimpleName(),
+          id,
+          field.toString(),
+          declarations.get(id).field().toString());
+    }
+
+    Class<?> timerSpecRawType = field.getType();
+    if (!(timerSpecRawType.equals(TimerSpec.class))) {
+      errors.throwIllegalArgument(
+          "%s annotation on non-%s field [%s]",
+          DoFn.TimerId.class.getSimpleName(),
+          TimerSpec.class.getSimpleName(),
+          field.toString());
+    }
+
+    if (!Modifier.isFinal(field.getModifiers())) {
+      errors.throwIllegalArgument(
+          "Non-final field %s annotated with %s. Timer declarations must be final.",
+          field.toString(), DoFn.TimerId.class.getSimpleName());
+    }
+  }
+
   /** Generates a type token for {@code Coder<T>} given {@code T}. */
   private static <T> TypeToken<Coder<T>> coderTypeOf(TypeToken<T>
elementT) {
     return new TypeToken<Coder<T>>() {}.where(new TypeParameter<T>() {},
elementT);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ccefc6f2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 230e6ca..9813af5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -25,7 +25,11 @@ import com.google.common.reflect.TypeToken;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -132,6 +136,191 @@ public class DoFnSignaturesTest {
   }
 
   @Test
+  public void testTimerIdWithWrongType() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("TimerId");
+    thrown.expectMessage("TimerSpec");
+    thrown.expectMessage("bizzle");
+    DoFnSignatures.INSTANCE.getSignature(
+        new DoFn<String, String>() {
+          @TimerId("foo")
+          private final String bizzle = "bazzle";
+
+          @ProcessElement
+          public void foo(ProcessContext context) {}
+        }.getClass());
+  }
+
+  @Test
+  public void testTimerIdNoCallback() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No callback registered");
+    thrown.expectMessage("my-timer-id");
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFn<KV<String, Integer>, Long>() {
+              @TimerId("my-timer-id")
+              private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @ProcessElement
+              public void foo(ProcessContext context) {}
+            }.getClass());
+  }
+
+  @Test
+  public void testOnTimerNoDeclaration() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Callback");
+    thrown.expectMessage("undeclared timer");
+    thrown.expectMessage("onTimerFoo");
+    thrown.expectMessage("my-timer-id");
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFn<KV<String, Integer>, Long>() {
+              @OnTimer("my-timer-id")
+              public void onTimerFoo() {}
+
+              @ProcessElement
+              public void foo(ProcessContext context) {}
+            }.getClass());
+  }
+
+  @Test
+  public void testOnTimerDeclaredInSuperclass() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Callback");
+    thrown.expectMessage("declared in a different class");
+    thrown.expectMessage("my-timer-id");
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFnDeclaringMyTimerId() {
+              @OnTimer("my-timer-id")
+              public void onTimerFoo() {}
+
+              @ProcessElement
+              public void foo(ProcessContext context) {}
+            }.getClass());
+  }
+
+  @Test
+  public void testOnTimerDeclaredInSubclass() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Callback");
+    thrown.expectMessage("declared in a different class");
+    thrown.expectMessage("my-timer-id");
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFnUsingMyTimerId() {
+              @TimerId("my-timer-id")
+              private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @ProcessElement
+              public void foo(ProcessContext context) {}
+            }.getClass());
+  }
+
+  /**
+   * In this particular test, the super class annotated both the timer and the callback,
+   * and the subclass overrides an abstract method. This is allowed.
+   */
+  @Test
+  public void testOnTimerDeclaredAndUsedInSuperclass() throws Exception {
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFnOverridingAbstractCallback().getClass());
+
+    assertThat(sig.timerDeclarations().size(), equalTo(1));
+    assertThat(sig.onTimerMethods().size(), equalTo(1));
+
+    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("my-timer-id");
+    DoFnSignature.OnTimerMethod callback = sig.onTimerMethods().get("my-timer-id");
+
+    assertThat(
+        decl.field(),
+        equalTo(DoFnDeclaringMyTimerIdAndAbstractCallback.class.getDeclaredField("myTimerSpec")));
+
+    // The method we pull out is the superclass method; this is what allows validation to
remain
+    // simple. The later invokeDynamic instruction causes it to invoke the actual implementation.
+    assertThat(
+        callback.targetMethod(),
+        equalTo(DoFnDeclaringMyTimerIdAndAbstractCallback.class.getDeclaredMethod("onMyTimer")));
+  }
+
+  @Test
+  public void testTimerIdDuplicate() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Duplicate");
+    thrown.expectMessage("TimerId");
+    thrown.expectMessage("my-timer-id");
+    thrown.expectMessage("myfield1");
+    thrown.expectMessage("myfield2");
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFn<KV<String, Integer>, Long>() {
+              @TimerId("my-timer-id")
+              private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @TimerId("my-timer-id")
+              private final TimerSpec myfield2 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @ProcessElement
+              public void foo(ProcessContext context) {}
+            }.getClass());
+  }
+
+  @Test
+  public void testTimerIdNonFinal() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Timer declarations must be final");
+    thrown.expectMessage("Non-final field");
+    thrown.expectMessage("myfield");
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFn<KV<String, Integer>, Long>() {
+              @TimerId("my-timer-id")
+              private TimerSpec myfield = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+              @ProcessElement
+              public void foo(ProcessContext context) {}
+            }.getClass());
+  }
+
+  @Test
+  public void testSimpleTimerIdAnonymousDoFn() throws Exception {
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.getSignature(
+            new DoFn<KV<String, Integer>, Long>() {
+              @TimerId("foo")
+              private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+              @ProcessElement
+              public void foo(ProcessContext context) {}
+
+              @OnTimer("foo")
+              public void onFoo() {}
+            }.getClass());
+
+    assertThat(sig.timerDeclarations().size(), equalTo(1));
+    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+
+    assertThat(decl.id(), equalTo("foo"));
+    assertThat(decl.field().getName(), equalTo("bizzle"));
+  }
+
+  @Test
+  public void testSimpleTimerIdNamedDoFn() throws Exception {
+    // Test classes at the bottom of the file
+    DoFnSignature sig =
+        DoFnSignatures.INSTANCE.signatureForDoFn(new DoFnForTestSimpleTimerIdNamedDoFn());
+
+    assertThat(sig.timerDeclarations().size(), equalTo(1));
+    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+
+    assertThat(decl.id(), equalTo("foo"));
+    assertThat(
+        decl.field(), equalTo(DoFnForTestSimpleTimerIdNamedDoFn.class.getDeclaredField("bizzle")));
+  }
+
   public void testStateIdWithWrongType() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("StateId");
@@ -265,4 +454,53 @@ public class DoFnSignaturesTest {
     @ProcessElement
     public void foo(ProcessContext context) {}
   }
+
+  private static class DoFnForTestSimpleTimerIdNamedDoFn extends DoFn<KV<String, Integer>,
Long> {
+    @TimerId("foo")
+    private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void foo(ProcessContext context) {}
+
+    @OnTimer("foo")
+    public void onFoo() {}
+  }
+
+  private abstract static class DoFnDeclaringMyTimerId extends DoFn<KV<String, Integer>,
Long> {
+    @TimerId("my-timer-id")
+    private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void foo(ProcessContext context) {}
+  }
+
+  private abstract static class DoFnUsingMyTimerId extends DoFn<KV<String, Integer>,
Long> {
+    @OnTimer("my-timer-id")
+    public void onMyTimer() {}
+
+    @ProcessElement
+    public void foo(ProcessContext context) {}
+  }
+
+  private abstract static class DoFnDeclaringMyTimerIdAndAbstractCallback
+      extends DoFn<KV<String, Integer>, Long> {
+    @TimerId("my-timer-id")
+    private final TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void foo(ProcessContext context) {}
+
+    @OnTimer("my-timer-id")
+    public abstract void onMyTimer();
+  }
+
+  private static class DoFnOverridingAbstractCallback extends
+      DoFnDeclaringMyTimerIdAndAbstractCallback {
+
+    @Override
+    public void onMyTimer() {}
+
+    @ProcessElement
+    public void foo(ProcessContext context) {}
+  }
 }


Mime
View raw message