beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [03/43] beam git commit: Add support for PipelineOptions parameters
Date Mon, 10 Jul 2017 04:49:33 GMT
Add support for PipelineOptions parameters

This is a step towards eliminating catch-all context parameters and
making DoFns express their fine-grained data needs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56cb6c51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56cb6c51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56cb6c51

Branch: refs/heads/gearpump-runner
Commit: 56cb6c51748fde6ad56522733ab10edca062e802
Parents: f75dfe7
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Jun 13 10:29:50 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Fri Jun 30 15:38:47 2017 -0700

----------------------------------------------------------------------
 ...eBoundedSplittableProcessElementInvoker.java |  5 ++
 .../beam/runners/core/SimpleDoFnRunner.java     | 20 +++++++
 .../apache/beam/sdk/transforms/DoFnTester.java  |  5 ++
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  6 ++
 .../sdk/transforms/reflect/DoFnInvoker.java     | 13 +++-
 .../sdk/transforms/reflect/DoFnSignature.java   | 23 +++++++
 .../sdk/transforms/reflect/DoFnSignatures.java  | 22 ++++++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 63 ++++++++++++++++++++
 .../transforms/reflect/DoFnSignaturesTest.java  | 14 +++++
 9 files changed, 169 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 2db6531..475abf2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -118,6 +118,11 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
           }
 
           @Override
+          public PipelineOptions pipelineOptions() {
+            return pipelineOptions;
+          }
+
+          @Override
           public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn)
{
             throw new IllegalStateException(
                 "Should not access startBundleContext() from @"

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 7d7babd..c3bfef6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -233,6 +233,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return getPipelineOptions();
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT,
OutputT> doFn) {
       return this;
     }
@@ -298,6 +303,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return getPipelineOptions();
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT,
OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access StartBundleContext outside of @StartBundle method.");
@@ -467,6 +477,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return getPipelineOptions();
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT,
OutputT> doFn) {
       throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
     }
@@ -568,6 +583,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
Out
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return getPipelineOptions();
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT,
OutputT> doFn) {
       throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 4da9a80..b2377dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -290,6 +290,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable
{
             }
 
             @Override
+            public PipelineOptions pipelineOptions() {
+              return getPipelineOptions();
+            }
+
+            @Override
             public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
                 DoFn<InputT, OutputT> doFn) {
               throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 4f67db4..8378204 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -90,6 +90,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
   public static final String PROCESS_CONTEXT_PARAMETER_METHOD = "processContext";
   public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext";
   public static final String WINDOW_PARAMETER_METHOD = "window";
+  public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = "pipelineOptions";
   public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker";
   public static final String STATE_PARAMETER_METHOD = "state";
   public static final String TIMER_PARAMETER_METHOD = "timer";
@@ -627,6 +628,11 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory
{
                     getExtraContextFactoryMethodDescription(TIMER_PARAMETER_METHOD, String.class)),
                 TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class)));
           }
+
+          @Override
+          public StackManipulation dispatch(DoFnSignature.Parameter.PipelineOptionsParameter
p) {
+            return simpleExtraContextParameter(PIPELINE_OPTIONS_PARAMETER_METHOD);
+          }
         });
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index ed81f42..3b22fda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -102,7 +103,12 @@ public interface DoFnInvoker<InputT, OutputT> {
      */
     BoundedWindow window();
 
-    /** Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}. */
+    /** Provide {@link PipelineOptions}. */
+    PipelineOptions pipelineOptions();
+
+    /**
+     * Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}.
+     */
     DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT>
doFn);
 
     /** Provide a {@link DoFn.FinishBundleContext} to use with the given {@link DoFn}. */
@@ -140,6 +146,11 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return null;
+    }
+
+    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT,
OutputT> doFn) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 0b4bf90..6eeed8e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.Timer;
@@ -193,6 +194,8 @@ public abstract class DoFnSignature {
         return cases.dispatch((StateParameter) this);
       } else if (this instanceof TimerParameter) {
         return cases.dispatch((TimerParameter) this);
+      } else if (this instanceof PipelineOptionsParameter) {
+        return cases.dispatch((PipelineOptionsParameter) this);
       } else {
         throw new IllegalStateException(
             String.format("Attempt to case match on unknown %s subclass %s",
@@ -212,6 +215,7 @@ public abstract class DoFnSignature {
       ResultT dispatch(RestrictionTrackerParameter p);
       ResultT dispatch(StateParameter p);
       ResultT dispatch(TimerParameter p);
+      ResultT dispatch(PipelineOptionsParameter p);
 
       /**
        * A base class for a visitor with a default method for cases it is not interested
in.
@@ -259,6 +263,11 @@ public abstract class DoFnSignature {
         public ResultT dispatch(TimerParameter p) {
           return dispatchDefault(p);
         }
+
+        @Override
+        public ResultT dispatch(PipelineOptionsParameter p) {
+          return dispatchDefault(p);
+        }
       }
     }
 
@@ -287,6 +296,11 @@ public abstract class DoFnSignature {
       return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT);
     }
 
+    /** Returns a {@link PipelineOptionsParameter}. */
+    public static PipelineOptionsParameter pipelineOptions() {
+      return new AutoValue_DoFnSignature_Parameter_PipelineOptionsParameter();
+    }
+
     /**
      * Returns a {@link RestrictionTrackerParameter}.
      */
@@ -306,6 +320,14 @@ public abstract class DoFnSignature {
     }
 
     /**
+     * Descriptor for a {@link Parameter} of a subtype of {@link PipelineOptions}.
+     */
+    @AutoValue
+    public abstract static class PipelineOptionsParameter extends Parameter {
+      PipelineOptionsParameter() {}
+    }
+
+    /**
      * Descriptor for a {@link Parameter} of type {@link DoFn.StartBundleContext}.
      *
      * <p>All such descriptors are equal.
@@ -314,6 +336,7 @@ public abstract class DoFnSignature {
     public abstract static class StartBundleContextParameter extends Parameter {
       StartBundleContextParameter() {}
     }
+
     /**
      * Descriptor for a {@link Parameter} of type {@link DoFn.FinishBundleContext}.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index bb191b1..1b27e66 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.Timer;
@@ -78,19 +79,23 @@ public class DoFnSignatures {
       ImmutableList.of(
           Parameter.ProcessContextParameter.class,
           Parameter.WindowParameter.class,
+          Parameter.PipelineOptionsParameter.class,
           Parameter.TimerParameter.class,
           Parameter.StateParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
           ImmutableList.of(
-              Parameter.ProcessContextParameter.class, Parameter.RestrictionTrackerParameter.class);
+              Parameter.PipelineOptionsParameter.class,
+              Parameter.ProcessContextParameter.class,
+              Parameter.RestrictionTrackerParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_ON_TIMER_PARAMETERS =
           ImmutableList.of(
               Parameter.OnTimerContextParameter.class,
               Parameter.WindowParameter.class,
+              Parameter.PipelineOptionsParameter.class,
               Parameter.TimerParameter.class,
               Parameter.StateParameter.class);
 
@@ -187,6 +192,15 @@ public class DoFnSignatures {
           extraParameters, Predicates.instanceOf(WindowParameter.class));
     }
 
+    /**
+     * Indicates whether a {@link Parameter.PipelineOptionsParameter} is
+     * known in this context.
+     */
+    public boolean hasPipelineOptionsParamter() {
+      return Iterables.any(
+          extraParameters, Predicates.instanceOf(Parameter.PipelineOptionsParameter.class));
+    }
+
     /** The window type, if any, used by this method. */
     @Nullable
     public TypeDescriptor<? extends BoundedWindow> getWindowType() {
@@ -789,6 +803,12 @@ public class DoFnSignatures {
           "Multiple %s parameters",
           BoundedWindow.class.getSimpleName());
       return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT);
+    } else if (PipelineOptions.class.equals(rawType)) {
+      methodErrors.checkArgument(
+          !methodContext.hasPipelineOptionsParamter(),
+          "Multiple %s parameters",
+          PipelineOptions.class.getSimpleName());
+      return Parameter.pipelineOptions();
     } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
           !methodContext.hasRestrictionTrackerParameter(),

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index c67cf2a..5b60ef3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -62,6 +62,8 @@ import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.MapState;
@@ -2942,4 +2944,65 @@ public class ParDoTest implements Serializable {
 
     // If it doesn't crash, we made it!
   }
+
+  /** A {@link PipelineOptions} subclass for testing passing to a {@link DoFn}. */
+  public interface MyOptions extends PipelineOptions {
+    @Default.String("fake option")
+    String getFakeOption();
+    void setFakeOption(String value);
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testPipelineOptionsParameter() {
+    PCollection<String> results = pipeline
+        .apply(Create.of(1))
+        .apply(
+            ParDo.of(
+                new DoFn<Integer, String>() {
+                  @ProcessElement
+                  public void process(ProcessContext c, PipelineOptions options) {
+                    c.output(options.as(MyOptions.class).getFakeOption());
+                  }
+                }));
+
+    String testOptionValue = "not fake anymore";
+    pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
+    PAssert.that(results).containsInAnyOrder("not fake anymore");
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testPipelineOptionsParameterOnTimer() {
+    final String timerId = "thisTimer";
+
+    PCollection<String> results =
+        pipeline
+            .apply(Create.of(KV.of(0, 0)))
+            .apply(
+                ParDo.of(
+                    new DoFn<KV<Integer, Integer>, String>() {
+                      @TimerId(timerId)
+                      private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+                      @ProcessElement
+                      public void process(
+                          ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer)
{
+                        timer.set(w.maxTimestamp());
+                      }
+
+                      @OnTimer(timerId)
+                      public void onTimer(OnTimerContext c, PipelineOptions options) {
+                        c.output(options.as(MyOptions.class).getFakeOption());
+                      }
+                    }));
+
+    String testOptionValue = "not fake anymore";
+    pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
+    PAssert.that(results).containsInAnyOrder("not fake anymore");
+
+    pipeline.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index cffb0ad..70c8dfd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.fail;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -329,6 +330,19 @@ public class DoFnSignaturesTest {
   }
 
   @Test
+  public void testPipelineOptionsParameter() throws Exception {
+    DoFnSignature sig =
+        DoFnSignatures.getSignature(new DoFn<String, String>() {
+          @ProcessElement
+          public void process(ProcessContext c, PipelineOptions options) {}
+        }.getClass());
+
+    assertThat(
+        sig.processElement().extraParameters(),
+        Matchers.<Parameter>hasItem(instanceOf(Parameter.PipelineOptionsParameter.class)));
+  }
+
+  @Test
   public void testDeclAndUsageOfTimerInSuperclass() throws Exception {
     DoFnSignature sig =
         DoFnSignatures.getSignature(new DoFnOverridingAbstractTimerUse().getClass());


Mime
View raw message