beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [4/5] beam git commit: Supply PipelineOptions at Pipeline.run()
Date Sat, 29 Apr 2017 19:06:46 GMT
Supply PipelineOptions at Pipeline.run()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77a25452
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77a25452
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77a25452

Branch: refs/heads/master
Commit: 77a25452f87dd380eed981603252c238089d4439
Parents: 19aa8ba
Author: Kenneth Knowles <klk@google.com>
Authored: Mon Apr 24 12:50:57 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Sat Apr 29 12:05:34 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkTestPipeline.java   |  9 +--
 .../runners/dataflow/DataflowRunnerTest.java    | 12 +++-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 68 +++++++++++++-------
 .../apache/beam/sdk/testing/TestPipeline.java   |  9 +--
 .../java/org/apache/beam/sdk/PipelineTest.java  | 28 +++++---
 5 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
index d6240c4..f3498be 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
@@ -18,9 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
 
 /**
  * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the
@@ -61,12 +59,11 @@ public class FlinkTestPipeline extends Pipeline {
    */
   private static FlinkTestPipeline create(boolean streaming) {
     TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming);
-    return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
+    return new FlinkTestPipeline(flinkRunner.getPipelineOptions());
   }
 
-  private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
-              PipelineOptions options) {
-    super(runner, options);
+  private FlinkTestPipeline(PipelineOptions options) {
+    super(options);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 433fb77..c1d3fe6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -252,7 +252,7 @@ public class DataflowRunnerTest {
     };
 
     try {
-      TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+      Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()).run();
       fail();
     } catch (RuntimeException e) {
       assertThat(
@@ -271,7 +271,7 @@ public class DataflowRunnerTest {
     };
 
     try {
-      TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+      Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()).run();
       fail();
     } catch (RuntimeException e) {
       assertThat(
@@ -917,7 +917,13 @@ public class DataflowRunnerTest {
     DataflowPipelineOptions streamingOptions = buildPipelineOptions();
     streamingOptions.setStreaming(true);
     streamingOptions.setRunner(DataflowRunner.class);
-    Pipeline.create(streamingOptions);
+    Pipeline p = Pipeline.create(streamingOptions);
+
+    // Instantiation of a runner prior to run() currently has a side effect of mutating the
options.
+    // This could be tested by DataflowRunner.fromOptions(streamingOptions) but would not
ensure
+    // that the pipeline itself had the expected options set.
+    p.run();
+
     assertEquals(
         DataflowRunner.GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT,
         streamingOptions.getGcsUploadBufferSizeBytes().intValue());

http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 203bd14..d578a7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -29,10 +29,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
@@ -113,7 +113,6 @@ import org.slf4j.LoggerFactory;
  */
 public class Pipeline {
   private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
-
   /**
    * Thrown during execution of a {@link Pipeline}, whenever user code within that
    * {@link Pipeline} throws an exception.
@@ -133,12 +132,23 @@ public class Pipeline {
   // Public operations.
 
   /**
+   * Constructs a pipeline from default options.
+   *
+   * @return The newly created pipeline.
+   */
+  public static Pipeline create() {
+    Pipeline pipeline = new Pipeline(PipelineOptionsFactory.create());
+    LOG.debug("Creating {}", pipeline);
+    return pipeline;
+  }
+
+  /**
    * Constructs a pipeline from the provided options.
    *
    * @return The newly created pipeline.
    */
   public static Pipeline create(PipelineOptions options) {
-    Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), options);
+    Pipeline pipeline = new Pipeline(options);
     LOG.debug("Creating {}", pipeline);
     return pipeline;
   }
@@ -270,9 +280,22 @@ public class Pipeline {
   }
 
   /**
-   * Runs the {@link Pipeline} using its {@link PipelineRunner}.
+   * Runs this {@link Pipeline} using the default {@link PipelineOptions} provided
+   * to {@link #create(PipelineOptions)}.
+   *
+   * <p>It is an error to call this method if the pipeline was created without
+   * a default set of options.
    */
   public PipelineResult run() {
+    return run(defaultOptions);
+  }
+
+  /**
+   * Runs this {@link Pipeline} using the given {@link PipelineOptions}, using the runner
+   * specified by the options.
+   */
+  public PipelineResult run(PipelineOptions options) {
+    PipelineRunner runner = PipelineRunner.fromOptions(options);
     // Ensure all of the nodes are fully specified before a PipelineRunner gets access to
the
     // pipeline.
     LOG.debug("Running {} via {}", this, runner);
@@ -417,16 +440,14 @@ public class Pipeline {
   /////////////////////////////////////////////////////////////////////////////
   // Below here are internal operations, never called by users.
 
-  private final PipelineRunner<?> runner;
-  private final PipelineOptions options;
   private final TransformHierarchy transforms = new TransformHierarchy(this);
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
   private final List<String> unstableNames = new ArrayList<>();
+  private final PipelineOptions defaultOptions;
 
-  protected Pipeline(PipelineRunner<?> runner, PipelineOptions options) {
-    this.runner = runner;
-    this.options = options;
+  protected Pipeline(PipelineOptions options) {
+    this.defaultOptions = options;
   }
 
   @Override
@@ -435,6 +456,18 @@ public class Pipeline {
   }
 
   /**
+   * Returns the default {@link PipelineOptions} provided to {@link #create(PipelineOptions)}.
+   *
+   * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be
explicitly
+   *     provided to a transform if it is required.
+   */
+  @Deprecated
+  public PipelineOptions getOptions() {
+    return defaultOptions;
+  }
+
+
+  /**
    * Applies a {@link PTransform} to the given {@link PInput}.
    *
    * @see Pipeline#apply
@@ -516,19 +549,6 @@ public class Pipeline {
   }
 
   /**
-   * Returns the {@link PipelineOptions} provided at the time this {@link Pipeline} was created.
-   *
-   * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be
explicitly
-   *     provided to a transform if it is required. This method will be removed within a
Major
-   *     Version and should not be used.
-   */
-  @Deprecated
-  @Experimental
-  public PipelineOptions getOptions() {
-    return options;
-  }
-
-  /**
    * Returns a unique name for a transform with the given prefix (from
    * enclosing transforms) and initial name.
    *
@@ -572,7 +592,9 @@ public class Pipeline {
 
     @Override
     public CompositeBehavior enterCompositeTransform(Node node) {
-      node.getTransform().validate(options);
+      if (node.getTransform() != null) {
+        node.getTransform().validate(options);
+      }
       return CompositeBehavior.ENTER_TRANSFORM;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 1273442..d45106c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.junit.experimental.categories.Category;
@@ -257,12 +256,11 @@ public class TestPipeline extends Pipeline implements TestRule {
   }
 
   public static TestPipeline fromOptions(PipelineOptions options) {
-    return new TestPipeline(PipelineRunner.fromOptions(options), options);
+    return new TestPipeline(options);
   }
 
-  private TestPipeline(
-      final PipelineRunner<? extends PipelineResult> runner, final PipelineOptions
options) {
-    super(runner, options);
+  private TestPipeline(final PipelineOptions options) {
+    super(options);
   }
 
   @Override
@@ -316,7 +314,6 @@ public class TestPipeline extends Pipeline implements TestRule {
    * Runs this {@link TestPipeline}, unwrapping any {@code AssertionError} that is raised
during
    * testing.
    */
-  @Override
   public PipelineResult run() {
     checkState(
         enforcement.isPresent(),

http://git-wip-us.apache.org/repos/asf/beam/blob/77a25452/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 5ddfc57..fda64b5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -87,16 +87,15 @@ public class PipelineTest {
   @Rule public ExpectedLogs logged = ExpectedLogs.none(Pipeline.class);
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  static class PipelineWrapper extends Pipeline {
-    protected PipelineWrapper(PipelineRunner<?> runner) {
-      super(runner, PipelineOptionsFactory.create());
-    }
-  }
-
   // Mock class that throws a user code exception during the call to
   // Pipeline.run().
   static class TestPipelineRunnerThrowingUserException
       extends PipelineRunner<PipelineResult> {
+
+    public static TestPipelineRunnerThrowingUserException fromOptions(PipelineOptions options)
{
+      return new TestPipelineRunnerThrowingUserException();
+    }
+
     @Override
     public PipelineResult run(Pipeline pipeline) {
       Throwable t = new IllegalStateException("user code exception");
@@ -106,8 +105,13 @@ public class PipelineTest {
 
   // Mock class that throws an SDK or API client code exception during
   // the call to Pipeline.run().
-  static class TestPipelineRunnerThrowingSDKException
+  static class TestPipelineRunnerThrowingSdkException
       extends PipelineRunner<PipelineResult> {
+
+    public static TestPipelineRunnerThrowingSdkException fromOptions(PipelineOptions options)
{
+      return new TestPipelineRunnerThrowingSdkException();
+    }
+
     @Override
     public PipelineResult run(Pipeline pipeline) {
       throw new IllegalStateException("SDK exception");
@@ -116,8 +120,9 @@ public class PipelineTest {
 
   @Test
   public void testPipelineUserExceptionHandling() {
-    Pipeline p = new PipelineWrapper(
-        new TestPipelineRunnerThrowingUserException());
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.setRunner(TestPipelineRunnerThrowingUserException.class);
+    Pipeline p = Pipeline.create(options);
 
     // Check pipeline runner correctly catches user errors.
     thrown.expect(PipelineExecutionException.class);
@@ -128,7 +133,9 @@ public class PipelineTest {
 
   @Test
   public void testPipelineSDKExceptionHandling() {
-    Pipeline p = new PipelineWrapper(new TestPipelineRunnerThrowingSDKException());
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.setRunner(TestPipelineRunnerThrowingSdkException.class);
+    Pipeline p = Pipeline.create(options);
 
     // Check pipeline runner correctly catches SDK errors.
     try {
@@ -389,6 +396,7 @@ public class PipelineTest {
 
   @Test
   public void testReplacedNames() {
+    pipeline.enableAbandonedNodeEnforcement(false);
     final PCollection<String> originalInput = pipeline.apply(Create.of("foo", "bar",
"baz"));
     class OriginalTransform extends PTransform<PCollection<String>, PCollection<Long>>
{
       @Override


Mime
View raw message