beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamesmal...@apache.org
Subject [15/50] [abbrv] incubator-beam git commit: Make TestPipeline slightly less DataflowPipelineRunner-centric
Date Fri, 26 Feb 2016 22:54:52 GMT
Make TestPipeline slightly less DataflowPipelineRunner-centric

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115302769


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13a042ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13a042ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13a042ae

Branch: refs/heads/master
Commit: 13a042aed7d01a126a1ff7ecb66e723474191fe0
Parents: c0a814b
Author: klk <klk@google.com>
Authored: Mon Feb 22 21:17:40 2016 -0800
Committer: Davor Bonaci <davorbonaci@users.noreply.github.com>
Committed: Thu Feb 25 23:58:26 2016 -0800

----------------------------------------------------------------------
 .../dataflow/sdk/testing/TestPipeline.java      | 69 ++++++++++++--------
 .../dataflow/sdk/testing/TestPipelineTest.java  | 18 +++--
 2 files changed, 53 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13a042ae/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
index 05b5bad..a05a778 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
@@ -19,22 +19,23 @@ package com.google.cloud.dataflow.sdk.testing;
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.PipelineResult;
 import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
+import com.google.cloud.dataflow.sdk.options.GcpOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.util.TestCredential;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.Iterator;
 
+import javax.annotation.Nullable;
+
 /**
  * A creator of test pipelines that can be used inside of tests that can be
  * configured to run locally or against the live service.
@@ -67,7 +68,6 @@ import java.util.Iterator;
  */
 public class TestPipeline extends Pipeline {
   private static final String PROPERTY_DATAFLOW_OPTIONS = "dataflowOptions";
-  private static final Logger LOG = LoggerFactory.getLogger(TestPipeline.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
   /**
@@ -77,28 +77,22 @@ public class TestPipeline extends Pipeline {
    * {@link Pipeline#run} to execute the pipeline and check the tests.
    */
   public static TestPipeline create() {
-    if (isIntegrationTest()) {
-      TestDataflowPipelineOptions options = getPipelineOptions();
-      LOG.info("Using passed in options: " + options);
-      options.setStableUniqueNames(CheckEnabled.ERROR);
-      return new TestPipeline(TestDataflowPipelineRunner.fromOptions(options), options);
-    } else {
-      DirectPipelineRunner directRunner = DirectPipelineRunner.createForTest();
-      directRunner.getPipelineOptions().setAppName(getAppName());
-      directRunner.getPipelineOptions().setStableUniqueNames(CheckEnabled.ERROR);
-      return new TestPipeline(directRunner, directRunner.getPipelineOptions());
-    }
+    return fromOptions(testingPipelineOptions());
+  }
+
+  public static TestPipeline fromOptions(PipelineOptions options) {
+    return new TestPipeline(PipelineRunner.fromOptions(options), options);
   }
 
   /**
-   * Returns whether this test is running on the Cloud Dataflow service as described
-   * in {@link TestPipeline}.
+   * Returns whether a {@link TestPipeline} supports dynamic work rebalancing, and thus tests
+   * of dynamic work rebalancing are expected to pass.
    */
-  public static boolean isIntegrationTest() {
-    return Boolean.parseBoolean(System.getProperty("runIntegrationTestOnService"));
+  public boolean supportsDynamicWorkRebalancing() {
+    return getRunner() instanceof DataflowPipelineRunner;
   }
 
-  TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options)
{
+  private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions
options) {
     super(runner, options);
   }
 
@@ -126,14 +120,28 @@ public class TestPipeline extends Pipeline {
   }
 
   /**
-   * Creates PipelineOptions for testing with a DataflowPipelineRunner.
+   * Creates {@link PipelineOptions} for testing.
    */
-  public static TestDataflowPipelineOptions getPipelineOptions() {
+  public static PipelineOptions testingPipelineOptions() {
     try {
-      TestDataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(
-              MAPPER.readValue(System.getProperty(PROPERTY_DATAFLOW_OPTIONS), String[].class))
-          .as(TestDataflowPipelineOptions.class);
-      options.setAppName(getAppName());
+      @Nullable String systemDataflowOptions = System.getProperty(PROPERTY_DATAFLOW_OPTIONS);
+      PipelineOptions options =
+          systemDataflowOptions == null
+              ? PipelineOptionsFactory.create()
+              : PipelineOptionsFactory.fromArgs(
+                      MAPPER.readValue(
+                          System.getProperty(PROPERTY_DATAFLOW_OPTIONS), String[].class))
+                  .as(PipelineOptions.class);
+
+      options.as(ApplicationNameOptions.class).setAppName(getAppName());
+      if (isIntegrationTest()) {
+        // TODO: adjust everyone's integration test frameworks to set the runner class via
the
+        // pipeline options via PROPERTY_DATAFLOW_OPTIONS
+        options.setRunner(TestDataflowPipelineRunner.class);
+      } else {
+        options.as(GcpOptions.class).setGcpCredential(new TestCredential());
+      }
+      options.setStableUniqueNames(CheckEnabled.ERROR);
       return options;
     } catch (IOException e) {
       throw new RuntimeException("Unable to instantiate test options from system property
"
@@ -141,6 +149,13 @@ public class TestPipeline extends Pipeline {
     }
   }
 
+  /**
+   * Returns whether a {@link TestPipeline} should be treated as an integration test.
+   */
+  private static boolean isIntegrationTest() {
+    return Boolean.parseBoolean(System.getProperty("runIntegrationTestOnService"));
+  }
+
   /** Returns the class + method name of the test, or a default name. */
   private static String getAppName() {
     Optional<StackTraceElement> stackTraceElement = findCallersStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13a042ae/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java
index d74ba6a..397920a 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java
@@ -21,6 +21,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
+import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.GcpOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -57,15 +61,17 @@ public class TestPipelineTest {
       "--diskSizeGb=2"
     });
     System.getProperties().put("dataflowOptions", stringOptions);
-    TestDataflowPipelineOptions options = TestPipeline.getPipelineOptions();
+    DataflowPipelineOptions options =
+        TestPipeline.testingPipelineOptions().as(DataflowPipelineOptions.class);
     assertEquals(DataflowPipelineRunner.class, options.getRunner());
     assertThat(options.getJobName(), startsWith("testpipelinetest0testcreationofpipelineoptions-"));
-    assertEquals("testProject", options.getProject());
+    assertEquals("testProject", options.as(GcpOptions.class).getProject());
     assertEquals("testApiRootUrl", options.getApiRootUrl());
     assertEquals("testDataflowEndpoint", options.getDataflowEndpoint());
     assertEquals("testTempLocation", options.getTempLocation());
     assertEquals("testServiceAccountName", options.getServiceAccountName());
-    assertEquals("testServiceAccountKeyfile", options.getServiceAccountKeyfile());
+    assertEquals(
+        "testServiceAccountKeyfile", options.as(GcpOptions.class).getServiceAccountKeyfile());
     assertEquals("testZone", options.getZone());
     assertEquals(2, options.getDiskSizeGb());
   }
@@ -75,11 +81,9 @@ public class TestPipelineTest {
         ObjectMapper mapper = new ObjectMapper();
     String stringOptions = mapper.writeValueAsString(new String[]{});
     System.getProperties().put("dataflowOptions", stringOptions);
-    TestDataflowPipelineOptions options = TestPipeline.getPipelineOptions();
-    assertThat(options.getAppName(), startsWith(
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    assertThat(options.as(ApplicationNameOptions.class).getAppName(), startsWith(
         "TestPipelineTest-testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase"));
-    assertThat(options.getJobName(), startsWith(
-        "testpipelinetest0testcreationofpipelineoptionsfrom"));
   }
 
   @Test


Mime
View raw message