beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [2/3] incubator-beam git commit: Provide a better error message for non-existing gcpTempLocation
Date Tue, 20 Dec 2016 21:04:44 GMT
Provide a better error message for non-existing gcpTempLocation

gcpTempLocation will default to using the value for tmpLocation, as long
as the value is a valid GCP path. Non-valid GCP paths are silently
discarded.

This change removes existence validation from the default value logic
such that downstream validation can provide a better error message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1a8583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1a8583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1a8583

Branch: refs/heads/master
Commit: ef1a858347e475cd15f7dcd8873464f506527b2a
Parents: 2f4b803
Author: Scott Wegner <swegner@google.com>
Authored: Tue Dec 6 14:19:12 2016 -0800
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue Dec 20 13:04:24 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 25 ++++++++----
 .../options/DataflowPipelineOptions.java        | 19 ++++-----
 .../runners/dataflow/DataflowRunnerTest.java    | 42 +++++++++++++++-----
 .../options/DataflowPipelineOptionsTest.java    | 20 ++++++----
 .../org/apache/beam/sdk/options/GcpOptions.java | 19 +++++----
 .../apache/beam/sdk/util/GcsPathValidator.java  |  3 +-
 .../apache/beam/sdk/options/GcpOptionsTest.java | 32 +++++++++++++--
 .../beam/sdk/util/GcsPathValidatorTest.java     | 15 +------
 8 files changed, 114 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 711b1b0..1a15eaf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -244,14 +244,23 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     }
 
     PathValidator validator = dataflowOptions.getPathValidator();
-    checkArgument(
-        !isNullOrEmpty(dataflowOptions.getGcpTempLocation()),
-        "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
-    validator.validateOutputFilePrefixSupported(dataflowOptions.getGcpTempLocation());
-    checkArgument(
-        !isNullOrEmpty(dataflowOptions.getStagingLocation()),
-        "DataflowRunner requires stagingLocation, and it is missing in PipelineOptions.");
-    validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
+    String gcpTempLocation;
+    try {
+      gcpTempLocation = dataflowOptions.getGcpTempLocation();
+    } catch (Exception e) {
+      throw new IllegalArgumentException("DataflowRunner requires gcpTempLocation, "
+          + "but failed to retrieve a value from PipelineOptions", e);
+    }
+    validator.validateOutputFilePrefixSupported(gcpTempLocation);
+
+    String stagingLocation;
+    try {
+      stagingLocation = dataflowOptions.getStagingLocation();
+    } catch (Exception e) {
+      throw new IllegalArgumentException("DataflowRunner requires stagingLocation, "
+          + "but failed to retrieve a value from PipelineOptions", e);
+    }
+    validator.validateOutputFilePrefixSupported(stagingLocation);
 
     if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) {
       validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 66632ad..5ddc5d0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-
 import java.io.IOException;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -107,17 +104,21 @@ public interface DataflowPipelineOptions
     @Override
     public String create(PipelineOptions options) {
       GcsOptions gcsOptions = options.as(GcsOptions.class);
-      String gcpTempLocation = gcsOptions.getGcpTempLocation();
-      checkArgument(!isNullOrEmpty(gcpTempLocation),
-          "Error constructing default value for stagingLocation: gcpTempLocation is missing."
-          + "Either stagingLocation must be set explicitly or a valid value must be provided"
-          + "for gcpTempLocation.");
+      String gcpTempLocation;
+      try {
+        gcpTempLocation = gcsOptions.getGcpTempLocation();
+      } catch (Exception e) {
+        throw new IllegalArgumentException(
+        "Error constructing default value for stagingLocation: failed to retrieve gcpTempLocation.
"
+            + "Either stagingLocation must be set explicitly or a valid value must be provided"
+            + "for gcpTempLocation.", e);
+      }
       try {
         gcsOptions.getPathValidator().validateOutputFilePrefixSupported(gcpTempLocation);
       } catch (Exception e) {
         throw new IllegalArgumentException(String.format(
             "Error constructing default value for stagingLocation: gcpTempLocation is not"
-            + " a valid GCS path, %s. ", gcpTempLocation));
+            + " a valid GCS path, %s. ", gcpTempLocation), e);
       }
       try {
         return IOChannelUtils.resolve(gcpTempLocation, "staging");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 4159b61..b29c4cd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -280,6 +281,26 @@ public class DataflowRunnerTest {
   }
 
   @Test
+  public void testPathExistsValidation() {
+    String[] args = new String[] {
+        "--runner=DataflowRunner",
+        "--tempLocation=gs://does/not/exist",
+        "--project=test-project",
+        "--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(),
+    };
+
+    try {
+      TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+      fail();
+    } catch (RuntimeException e) {
+      assertThat(
+          Throwables.getStackTraceAsString(e),
+          both(containsString("gs://does/not/exist"))
+              .and(containsString("does not exist or is not writeable")));
+    }
+  }
+
+  @Test
   public void testPathValidatorOverride() {
     String[] args = new String[] {
         "--runner=DataflowRunner",
@@ -544,7 +565,7 @@ public class DataflowRunnerTest {
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
         ThrowableMessageMatcher.hasMessage(
-            containsString("expected a valid 'gs://' path but was given"))));
+            containsString("Expected a valid 'gs://' path but was given"))));
     p.run();
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
@@ -559,7 +580,7 @@ public class DataflowRunnerTest {
     PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
 
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
+    thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given"));
     pc.apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
   }
 
@@ -594,7 +615,7 @@ public class DataflowRunnerTest {
     options.setGcpTempLocation("file://temp/location");
 
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
+    thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given"));
     DataflowRunner.fromOptions(options);
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
@@ -609,7 +630,8 @@ public class DataflowRunnerTest {
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(
-        "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
+        "DataflowRunner requires gcpTempLocation, "
+            + "but failed to retrieve a value from PipelineOptions");
     DataflowRunner.fromOptions(options);
   }
 
@@ -621,14 +643,14 @@ public class DataflowRunnerTest {
       DataflowRunner.fromOptions(options);
       fail("fromOptions should have failed");
     } catch (IllegalArgumentException e) {
-      assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+      assertThat(e.getMessage(), containsString("Expected a valid 'gs://' path but was given"));
     }
     options.setStagingLocation("my/staging/location");
     try {
       DataflowRunner.fromOptions(options);
       fail("fromOptions should have failed");
     } catch (IllegalArgumentException e) {
-      assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+      assertThat(e.getMessage(), containsString("Expected a valid 'gs://' path but was given"));
     }
   }
 
@@ -640,14 +662,14 @@ public class DataflowRunnerTest {
       DataflowRunner.fromOptions(options);
       fail("fromOptions should have failed");
     } catch (IllegalArgumentException e) {
-      assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+      assertThat(e.getMessage(), containsString("Expected a valid 'gs://' path but was given"));
     }
     options.setSaveProfilesToGcs("my/staging/location");
     try {
       DataflowRunner.fromOptions(options);
       fail("fromOptions should have failed");
     } catch (IllegalArgumentException e) {
-      assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given"));
+      assertThat(e.getMessage(), containsString("Expected a valid 'gs://' path but was given"));
     }
   }
 
@@ -795,8 +817,8 @@ public class DataflowRunnerTest {
     options.setProject("foo-project");
 
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions.");
+    thrown.expectMessage("DataflowRunner requires gcpTempLocation, "
+        + "but failed to retrieve a value from PipelineOption");
     DataflowRunner.fromOptions(options);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 52082e0..9dacfb2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -17,9 +17,10 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ResetDateTimeProvider;
@@ -127,9 +128,10 @@ public class DataflowPipelineOptionsTest {
   public void testStagingLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     IOChannelUtils.registerIOFactoriesAllowOverride(options);
-    options.setTempLocation("file://temp_location");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setTempLocation("gs://temp_location");
     options.setStagingLocation("gs://staging_location");
-    assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+    assertEquals("gs://temp_location", options.getGcpTempLocation());
     assertEquals("gs://staging_location", options.getStagingLocation());
   }
 
@@ -158,8 +160,10 @@ public class DataflowPipelineOptionsTest {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setTempLocation("file://temp_location");
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Error constructing default value for stagingLocation: gcpTempLocation is missing.");
+    thrown.expectMessage("Error constructing default value for stagingLocation: "
+        + "failed to retrieve gcpTempLocation.");
+    thrown.expectCause(hasMessage(containsString(
+        "Error constructing default value for gcpTempLocation")));
     options.getStagingLocation();
   }
 
@@ -171,6 +175,8 @@ public class DataflowPipelineOptionsTest {
     thrown.expectMessage(
         "Error constructing default value for stagingLocation: gcpTempLocation is not"
         + " a valid GCS path");
+    thrown.expectCause(
+        hasMessage(containsString("Expected a valid 'gs://' path")));
     options.getStagingLocation();
   }
 
@@ -178,8 +184,8 @@ public class DataflowPipelineOptionsTest {
   public void testDefaultStagingLocationUnset() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Error constructing default value for stagingLocation: gcpTempLocation is missing.");
+    thrown.expectMessage("Error constructing default value for stagingLocation: "
+        + "failed to retrieve gcpTempLocation.");
     options.getStagingLocation();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index ffdab98..042f4b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.options;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.auth.Credentials;
 import com.google.common.annotations.VisibleForTesting;
@@ -195,14 +197,15 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions
{
     @Nullable
     public String create(PipelineOptions options) {
       String tempLocation = options.getTempLocation();
-      if (!Strings.isNullOrEmpty(tempLocation)) {
-        try {
-          PathValidator validator = options.as(GcsOptions.class).getPathValidator();
-          validator.validateOutputFilePrefixSupported(tempLocation);
-        } catch (Exception e) {
-          // Ignore the temp location because it is not a valid 'gs://' path.
-          return null;
-        }
+      checkArgument(!Strings.isNullOrEmpty(options.getTempLocation()),
+          "Error constructing default value for gcpTempLocation: tempLocation is not set");
+      try {
+        PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+        validator.validateOutputFilePrefixSupported(tempLocation);
+      } catch (Exception e) {
+        throw new IllegalArgumentException(String.format(
+            "Error constructing default value for gcpTempLocation: tempLocation is not"
+                + " a valid GCS path, %s. ", tempLocation), e);
       }
       return tempLocation;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
index c8da4d8..a5b951d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -89,8 +89,7 @@ public class GcsPathValidator implements PathValidator {
       return GcsPath.fromUri(path);
     } catch (IllegalArgumentException e) {
       throw new IllegalArgumentException(String.format(
-          "%s expected a valid 'gs://' path but was given '%s'",
-          gcpOptions.getRunner().getSimpleName(), path), e);
+          "Expected a valid 'gs://' path but was given '%s'", path), e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
index 34077a2..7854d67 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -17,10 +17,10 @@
  */
 package org.apache.beam.sdk.options;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.testing.RestoreSystemProperties;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
@@ -45,6 +46,7 @@ import org.junit.runners.JUnit4;
 public class GcpOptionsTest {
   @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
@@ -107,7 +109,10 @@ public class GcpOptionsTest {
   @Test
   public void testEmptyGcpTempLocation() throws Exception {
     GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-    assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Error constructing default value for gcpTempLocation: tempLocation is not set");
+    options.getGcpTempLocation();
   }
 
   @Test
@@ -123,7 +128,26 @@ public class GcpOptionsTest {
   public void testDefaultGcpTempLocationInvalid() throws Exception {
     GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
     options.setTempLocation("file://");
-    assertTrue(isNullOrEmpty(options.getGcpTempLocation()));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Error constructing default value for gcpTempLocation: tempLocation is not"
+            + " a valid GCS path");
+    options.getGcpTempLocation();
+  }
+
+  @Test
+  public void testDefaultGcpTempLocationDoesNotExist() {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    String tempLocation = "gs://does/not/exist";
+    options.setTempLocation(tempLocation);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Error constructing default value for gcpTempLocation: tempLocation is not"
+            + " a valid GCS path");
+    thrown.expectCause(
+        hasMessage(containsString("Output path does not exist or is not writeable")));
+
+    options.getGcpTempLocation();
   }
 
   private static void makePropertiesFileWithProject(File path, String projectId)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
index 16f01a0..286490d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
@@ -21,11 +21,8 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.junit.Before;
 import org.junit.Rule;
@@ -44,20 +41,12 @@ public class GcsPathValidatorTest {
   @Mock private GcsUtil mockGcsUtil;
   private GcsPathValidator validator;
 
-  private class FakeRunner extends PipelineRunner<PipelineResult> {
-    @Override
-    public PipelineResult run(Pipeline pipeline) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   @Before
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
     when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
     GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
-    options.setRunner(FakeRunner.class);
     options.setGcpCredential(new TestCredential());
     options.setGcsUtil(mockGcsUtil);
     validator = GcsPathValidator.fromOptions(options);
@@ -72,7 +61,7 @@ public class GcsPathValidatorTest {
   public void testInvalidFilePattern() {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(
-        "FakeRunner expected a valid 'gs://' path but was given '/local/path'");
+        "Expected a valid 'gs://' path but was given '/local/path'");
     validator.validateInputFilePatternSupported("/local/path");
   }
 
@@ -94,7 +83,7 @@ public class GcsPathValidatorTest {
   public void testInvalidOutputPrefix() {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(
-        "FakeRunner expected a valid 'gs://' path but was given '/local/path'");
+        "Expected a valid 'gs://' path but was given '/local/path'");
     validator.validateOutputFilePrefixSupported("/local/path");
   }
 }


Mime
View raw message