beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Eliminate Pipeline.getOptions
Date Thu, 04 May 2017 20:24:34 GMT
Repository: beam
Updated Branches:
  refs/heads/master 70e53e7dc -> 1b363ae9f


Eliminate Pipeline.getOptions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55351dce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55351dce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55351dce

Branch: refs/heads/master
Commit: 55351dcebec8ba9e166c4f90555edca6b90b1b14
Parents: 70e53e7
Author: Kenneth Knowles <klk@google.com>
Authored: Wed May 3 03:06:36 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu May 4 13:00:15 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |  7 +++-
 .../examples/complete/game/HourlyTeamScore.java |  6 ++-
 .../examples/complete/game/LeaderBoard.java     | 26 ++++++++----
 .../beam/examples/complete/game/UserScore.java  | 20 +++++----
 .../complete/game/utils/WriteToBigQuery.java    | 32 +++++++-------
 .../game/utils/WriteWindowedToBigQuery.java     |  8 ++--
 .../beam/runners/direct/DirectRunner.java       |  1 +
 .../runners/direct/DisplayDataValidator.java    |  6 +--
 .../dataflow/testing/TestDataflowRunner.java    |  3 +-
 .../testing/TestDataflowRunnerTest.java         | 18 +++-----
 .../apache/beam/runners/spark/SparkRunner.java  |  4 +-
 .../beam/runners/spark/SparkRunnerDebugger.java |  4 +-
 .../beam/runners/spark/TestSparkRunner.java     |  5 +--
 .../spark/translation/EvaluationContext.java    | 16 +++++--
 .../spark/translation/SparkRuntimeContext.java  |  4 +-
 .../SparkRunnerStreamingContextFactory.java     |  2 +-
 .../apache/beam/runners/spark/CacheTest.java    |  2 +-
 .../streaming/TrackStreamingSourcesTest.java    |  2 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 14 +------
 .../apache/beam/sdk/testing/TestPipeline.java   | 26 +++++-------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 44 ++++++++++----------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  1 +
 22 files changed, 131 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index a46d3c5..abbb13b 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -24,6 +24,7 @@ import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -300,6 +301,8 @@ public class GameStats extends LeaderBoard {
       // Write the result to BigQuery
       .apply("WriteTeamSums",
           new WriteWindowedToBigQuery<KV<String, Integer>>(
+              options.as(GcpOptions.class).getProject(),
+              options.getDataset(),
               options.getGameStatsTablePrefix() + "_team", configureWindowedWrite()));
 
 
@@ -327,7 +330,9 @@ public class GameStats extends LeaderBoard {
       // Write this info to a BigQuery table.
       .apply("WriteAvgSessionLength",
              new WriteWindowedToBigQuery<Double>(
-                options.getGameStatsTablePrefix() + "_sessions", configureSessionWindowWrite()));
+                 options.as(GcpOptions.class).getProject(),
+                 options.getDataset(),
+                 options.getGameStatsTablePrefix() + "_sessions", configureSessionWindowWrite()));
     // [END DocInclude_Rewindow]
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 3f1ffb0..2928882 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.TimeZone;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -185,7 +186,10 @@ public class HourlyTeamScore extends UserScore {
       // Extract and sum teamname/score pairs from the event data.
       .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
       .apply("WriteTeamScoreSums",
-        new WriteWindowedToBigQuery<KV<String, Integer>>(options.getHourlyTeamScoreTableName(),
+        new WriteWindowedToBigQuery<KV<String, Integer>>(
+            options.as(GcpOptions.class).getProject(),
+            options.getDataset(),
+            options.getHourlyTeamScoreTableName(),
             configureWindowedTableWrite()));
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 9af34c5..bfad9f6 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,6 +27,7 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -194,14 +195,20 @@ public class LeaderBoard extends HourlyTeamScore {
             .withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
-    gameEvents.apply("CalculateTeamScores",
-        new CalculateTeamScores(
-            Duration.standardMinutes(options.getTeamWindowDuration()),
-            Duration.standardMinutes(options.getAllowedLateness())))
+    gameEvents
+        .apply(
+            "CalculateTeamScores",
+            new CalculateTeamScores(
+                Duration.standardMinutes(options.getTeamWindowDuration()),
+                Duration.standardMinutes(options.getAllowedLateness())))
         // Write the results to BigQuery.
-        .apply("WriteTeamScoreSums",
-               new WriteWindowedToBigQuery<KV<String, Integer>>(
-                  options.getLeaderBoardTableName() + "_team", configureWindowedTableWrite()));
+        .apply(
+            "WriteTeamScoreSums",
+            new WriteWindowedToBigQuery<KV<String, Integer>>(
+                options.as(GcpOptions.class).getProject(),
+                options.getDataset(),
+                options.getLeaderBoardTableName() + "_team",
+                configureWindowedTableWrite()));
     gameEvents
         .apply(
             "CalculateUserScores",
@@ -210,7 +217,10 @@ public class LeaderBoard extends HourlyTeamScore {
         .apply(
             "WriteUserScoreSums",
             new WriteToBigQuery<KV<String, Integer>>(
-                options.getLeaderBoardTableName() + "_user", configureGlobalWindowBigQueryWrite()));
+                options.as(GcpOptions.class).getProject(),
+                options.getDataset(),
+                options.getLeaderBoardTableName() + "_user",
+                configureGlobalWindowBigQueryWrite()));
 
     // Run the pipeline and wait for the pipeline to finish; capture cancellation requests
from the
     // command line.

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index c136c2e..8110146 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -24,6 +24,7 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -226,13 +227,18 @@ public class UserScore {
     Pipeline pipeline = Pipeline.create(options);
 
     // Read events from a text file and parse them.
-    pipeline.apply(TextIO.read().from(options.getInput()))
-      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
-      // Extract and sum username/score pairs from the event data.
-      .apply("ExtractUserScore", new ExtractAndSumScore("user"))
-      .apply("WriteUserScoreSums",
-          new WriteToBigQuery<KV<String, Integer>>(options.getUserScoreTableName(),
-                                                   configureBigQueryWrite()));
+    pipeline
+        .apply(TextIO.read().from(options.getInput()))
+        .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
+        // Extract and sum username/score pairs from the event data.
+        .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+        .apply(
+            "WriteUserScoreSums",
+            new WriteToBigQuery<KV<String, Integer>>(
+                options.as(GcpOptions.class).getProject(),
+                options.getDataset(),
+                options.getUserScoreTableName(),
+                configureBigQueryWrite()));
 
     // Run the batch pipeline.
     pipeline.run().waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index f767d21..2ec4e5c 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -25,13 +25,9 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.examples.complete.game.UserScore;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -47,14 +43,21 @@ import org.apache.beam.sdk.values.PDone;
 public class WriteToBigQuery<InputT>
     extends PTransform<PCollection<InputT>, PDone> {
 
+  protected String projectId;
+  protected String datasetId;
   protected String tableName;
   protected Map<String, FieldInfo<InputT>> fieldInfo;
 
   public WriteToBigQuery() {
   }
 
-  public WriteToBigQuery(String tableName,
+  public WriteToBigQuery(
+      String projectId,
+      String datasetId,
+      String tableName,
       Map<String, FieldInfo<InputT>> fieldInfo) {
+    this.projectId = projectId;
+    this.datasetId = datasetId;
     this.tableName = tableName;
     this.fieldInfo = fieldInfo;
   }
@@ -120,20 +123,21 @@ public class WriteToBigQuery<InputT>
   @Override
   public PDone expand(PCollection<InputT> teamAndScore) {
     teamAndScore
-      .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
-      .apply(BigQueryIO.writeTableRows().to(getTable(teamAndScore.getPipeline(), tableName))
-          .withSchema(getSchema())
-          .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-          .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+        .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to(getTable(projectId, datasetId, tableName))
+                .withSchema(getSchema())
+                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
     return PDone.in(teamAndScore.getPipeline());
   }
 
   /** Utility to construct an output table reference. */
-  static TableReference getTable(Pipeline pipeline, String tableName) {
-    PipelineOptions options = pipeline.getOptions();
+  static TableReference getTable(String projectId, String datasetId, String tableName) {
     TableReference table = new TableReference();
-    table.setDatasetId(options.as(UserScore.Options.class).getDataset());
-    table.setProjectId(options.as(GcpOptions.class).getProject());
+    table.setDatasetId(datasetId);
+    table.setProjectId(projectId);
     table.setTableId(tableName);
     return table;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index e602258..deb9db2 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -37,9 +37,9 @@ import org.apache.beam.sdk.values.PDone;
 public class WriteWindowedToBigQuery<T>
     extends WriteToBigQuery<T> {
 
-  public WriteWindowedToBigQuery(String tableName,
-      Map<String, FieldInfo<T>> fieldInfo) {
-    super(tableName, fieldInfo);
+  public WriteWindowedToBigQuery(
+      String projectId, String datasetId, String tableName, Map<String, FieldInfo<T>>
fieldInfo) {
+    super(projectId, datasetId, tableName, fieldInfo);
   }
 
   /** Convert each key/score pair into a BigQuery TableRow. */
@@ -62,7 +62,7 @@ public class WriteWindowedToBigQuery<T>
     teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.writeTableRows()
-                .to(getTable(teamAndScore.getPipeline(), tableName))
+                .to(getTable(projectId, datasetId, tableName))
                 .withSchema(getSchema())
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                 .withWriteDisposition(WriteDisposition.WRITE_APPEND));

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index c6168b3e..984598a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -163,6 +163,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
     pipeline.traverseTopologically(keyedPValueVisitor);
 
     DisplayDataValidator.validatePipeline(pipeline);
+    DisplayDataValidator.validateOptions(getPipelineOptions());
 
     DirectGraph graph = graphVisitor.getGraph();
     EvaluationContext context =

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
index c77cb48..209c801 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -31,12 +32,11 @@ class DisplayDataValidator {
   private DisplayDataValidator() {}
 
   static void validatePipeline(Pipeline pipeline) {
-    validateOptions(pipeline);
     validateTransforms(pipeline);
   }
 
-  private static void validateOptions(Pipeline pipeline) {
-    evaluateDisplayData(pipeline.getOptions());
+  static void validateOptions(PipelineOptions options) {
+    evaluateDisplayData(options);
   }
 
   private static void validateTransforms(Pipeline pipeline) {

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index ba9d971..c238d80 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -98,7 +98,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
   DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
     updatePAssertCount(pipeline);
 
-    TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
+    TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class);
     final DataflowPipelineJob job;
     job = runner.run(pipeline);
 
@@ -188,7 +188,6 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
 
   @VisibleForTesting
   void updatePAssertCount(Pipeline pipeline) {
-    DataflowPipelineOptions options = pipeline.getOptions().as(DataflowPipelineOptions.class);
     if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
       // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions.
       expectedNumberOfAssertions = 0;

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 54eb88d..eb068e6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -464,8 +464,7 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+    options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob,
0));
 
     when(mockClient.getJobMetrics(anyString()))
         .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
@@ -488,8 +487,7 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+    options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob,
0));
 
     when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.DONE);
@@ -515,8 +513,7 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob,
1));
 
     when(mockClient.getJobMetrics(anyString()))
         .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
@@ -544,8 +541,7 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob,
1));
 
     when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.DONE);
@@ -570,8 +566,7 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnSuccessMatcher(new TestFailureMatcher());
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
 
     when(mockClient.getJobMetrics(anyString()))
         .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
@@ -606,8 +601,7 @@ public class TestDataflowRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnSuccessMatcher(new TestFailureMatcher());
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
 
     when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.FAILED);

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 97487f3..1a0c042 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -193,7 +193,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
     } else {
       // create the evaluation context
       final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
-      final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline);
+      final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline, mOptions);
       translator = new TransformTranslator.Translator();
 
       // update the cache candidates
@@ -383,7 +383,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
         LOG.info(
             "Deferring combine transformation {} for job {}",
             transform,
-            ctxt.getPipeline().getOptions().getJobName());
+            ctxt.getOptions().getJobName());
         return true;
       }
       // default.

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 7f7aefc..8d47e1a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -89,10 +89,10 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul
         && ((TestSparkPipelineOptions) options).isForceStreaming()) {
       SparkPipelineTranslator streamingTranslator =
           new StreamingTransformTranslator.Translator(translator);
-      EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
+      EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
       visitor = new SparkNativePipelineVisitor(streamingTranslator, ctxt);
     } else {
-      EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
+      EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
       visitor = new SparkNativePipelineVisitor(translator, ctxt);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 1e67813..6808d7b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -83,6 +83,7 @@ import org.slf4j.LoggerFactory;
 public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class);
+  private final TestSparkPipelineOptions testSparkPipelineOptions;
 
   private SparkRunner delegate;
   private boolean isForceStreaming;
@@ -90,6 +91,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult>
{
   private TestSparkRunner(TestSparkPipelineOptions options) {
     this.delegate = SparkRunner.fromOptions(options);
     this.isForceStreaming = options.isForceStreaming();
+    this.testSparkPipelineOptions = options;
   }
 
   public static TestSparkRunner fromOptions(PipelineOptions options) {
@@ -101,9 +103,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult>
{
 
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
-    TestSparkPipelineOptions testSparkPipelineOptions =
-        pipeline.getOptions().as(TestSparkPipelineOptions.class);
-    //
     // if the pipeline forces execution as a streaming pipeline,
     // and the source is an adapted unbounded source (as bounded),
     // read it as unbounded source via UnboundedReadFromBoundedSource.

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 838c504..5d77e91 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -57,15 +58,18 @@ public class EvaluationContext {
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final SparkPCollectionView pviews = new SparkPCollectionView();
   private final Map<PCollection, Long> cacheCandidates = new HashMap<>();
+  private final PipelineOptions options;
 
-  public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
+  public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options)
{
     this.jsc = jsc;
     this.pipeline = pipeline;
-    this.runtime = new SparkRuntimeContext(pipeline);
+    this.options = options;
+    this.runtime = new SparkRuntimeContext(pipeline, options);
   }
 
-  public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, JavaStreamingContext
jssc) {
-    this(jsc, pipeline);
+  public EvaluationContext(
+      JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options, JavaStreamingContext
jssc) {
+    this(jsc, pipeline, options);
     this.jssc = jssc;
   }
 
@@ -81,6 +85,10 @@ public class EvaluationContext {
     return pipeline;
   }
 
+  public PipelineOptions getOptions() {
+    return options;
+  }
+
   public SparkRuntimeContext getRuntimeContext() {
     return runtime;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index e006143..3db1ab5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -35,8 +35,8 @@ public class SparkRuntimeContext implements Serializable {
   private final String serializedPipelineOptions;
   private transient CoderRegistry coderRegistry;
 
-  SparkRuntimeContext(Pipeline pipeline) {
-    this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
+  SparkRuntimeContext(Pipeline pipeline, PipelineOptions options) {
+    this.serializedPipelineOptions = serializePipelineOptions(options);
   }
 
   private String serializePipelineOptions(PipelineOptions pipelineOptions) {

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 2dd18f3..6a153ff 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -82,7 +82,7 @@ public class SparkRunnerStreamingContextFactory implements Function0<JavaStreami
     // We must first init accumulators since translators expect them to be instantiated.
     SparkRunner.initAccumulators(options, jsc);
 
-    EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
+    EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
     // update cache candidates
     SparkRunner.updateCacheCandidates(pipeline, translator, ctxt);
     pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
index 24b2e7b..d3d0823 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
@@ -51,7 +51,7 @@ public class CacheTest {
     pCollection.apply(Count.<String>globally());
 
     JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
-    EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
+    EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options);
     SparkRunner.CacheVisitor cacheVisitor =
         new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt);
     pipeline.traverseTopologically(cacheVisitor);

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
index 41ccd08..3dcab26 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
@@ -135,7 +135,7 @@ public class TrackStreamingSourcesTest {
         Pipeline pipeline,
         Class<? extends PTransform> transformClassToAssert,
         Integer... expected) {
-      this.ctxt = new EvaluationContext(jssc.sparkContext(), pipeline, jssc);
+      this.ctxt = new EvaluationContext(jssc.sparkContext(), pipeline, options, jssc);
       this.evaluator = new SparkRunner.Evaluator(
           new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()),
ctxt);
       this.transformClassToAssert = transformClassToAssert;

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 6fa7a5a..f7c3f24 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -460,7 +460,7 @@ public class Pipeline {
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
   private final List<String> unstableNames = new ArrayList<>();
-  protected final PipelineOptions defaultOptions;
+  private final PipelineOptions defaultOptions;
 
   protected Pipeline(PipelineOptions options) {
     this.defaultOptions = options;
@@ -472,18 +472,6 @@ public class Pipeline {
   }
 
   /**
-   * Returns the default {@link PipelineOptions} provided to {@link #create(PipelineOptions)}.
-   *
-   * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be
explicitly
-   *     provided to a transform if it is required.
-   */
-  @Deprecated
-  public PipelineOptions getOptions() {
-    return defaultOptions;
-  }
-
-
-  /**
    * Applies a {@link PTransform} to the given {@link PInput}.
    *
    * @see Pipeline#apply

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 2d34b22..96cae51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -26,7 +26,6 @@ import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -104,6 +103,8 @@ import org.junit.runners.model.Statement;
  */
 public class TestPipeline extends Pipeline implements TestRule {
 
+  private final PipelineOptions options;
+
   private static class PipelineRunEnforcement {
 
     @SuppressWarnings("WeakerAccess")
@@ -183,10 +184,7 @@ public class TestPipeline extends Pipeline implements TestRule {
     private void verifyPipelineExecution() {
       if (!isEmptyPipeline(pipeline)) {
         if (!runAttempted && !enableAutoRunIfMissing) {
-          throw new PipelineRunMissingException(
-              "The pipeline has not been run (runner: "
-                  + pipeline.getOptions().getRunner().getSimpleName()
-                  + ")");
+          throw new PipelineRunMissingException("The pipeline has not been run.");
 
         } else {
           final List<TransformHierarchy.Node> pipelineNodes = recordPipelineNodes(pipeline);
@@ -272,6 +270,11 @@ public class TestPipeline extends Pipeline implements TestRule {
 
   private TestPipeline(final PipelineOptions options) {
     super(options);
+    this.options = options;
+  }
+
+  public PipelineOptions getOptions() {
+    return this.options;
   }
 
   @Override
@@ -288,7 +291,7 @@ public class TestPipeline extends Pipeline implements TestRule {
                   .anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true));
 
           final boolean crashingRunner =
-              CrashingRunner.class.isAssignableFrom(getOptions().getRunner());
+              CrashingRunner.class.isAssignableFrom(options.getRunner());
 
           checkState(
               !(annotatedWithNeedsRunner && crashingRunner),
@@ -381,18 +384,9 @@ public class TestPipeline extends Pipeline implements TestRule {
     return this;
   }
 
-  @VisibleForTesting
-  @Override
-  /**
-   * Get this pipeline's options.
-   */
-  public PipelineOptions getOptions() {
-    return defaultOptions;
-  }
-
   @Override
   public String toString() {
-    return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
+    return "TestPipeline#" + options.as(ApplicationNameOptions.class).getAppName();
   }
 
   /** Creates {@link PipelineOptions} for testing. */

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 0e36393..fbbf862 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -888,6 +888,26 @@ public class BigQueryIO {
     public void validate(PipelineOptions pipelineOptions) {
       BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
 
+      // The user specified a table.
+      if (getJsonTableRef() != null && getJsonTableRef().isAccessible() &&
getValidate()) {
+        TableReference table = getTableWithDefaultProject(options).get();
+        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+        // Check for destination table presence and emptiness for early failure notification.
+        // Note that a presence check can fail when the table or dataset is created by an
earlier
+        // stage of the pipeline. For these cases the #withoutValidation method can be used
to
+        // disable the check.
+        BigQueryHelpers.verifyDatasetPresence(datasetService, table);
+        if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+          BigQueryHelpers.verifyTablePresence(datasetService, table);
+        }
+        if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+          BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+        }
+      }
+    }
+
+    @Override
+    public WriteResult expand(PCollection<T> input) {
       // We must have a destination to write to!
       checkState(
           getTableFunction() != null || getJsonTableRef() != null
@@ -916,29 +936,7 @@ public class BigQueryIO {
       checkArgument(2
               > Iterables.size(Iterables.filter(allSchemaArgs, Predicates.notNull())),
           "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may "
-          + "be set");
-
-      // The user specified a table.
-      if (getJsonTableRef() != null && getJsonTableRef().isAccessible() &&
getValidate()) {
-        TableReference table = getTableWithDefaultProject(options).get();
-        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
-        // Check for destination table presence and emptiness for early failure notification.
-        // Note that a presence check can fail when the table or dataset is created by an
earlier
-        // stage of the pipeline. For these cases the #withoutValidation method can be used
to
-        // disable the check.
-        BigQueryHelpers.verifyDatasetPresence(datasetService, table);
-        if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
-          BigQueryHelpers.verifyTablePresence(datasetService, table);
-        }
-        if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-          BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
-        }
-      }
-    }
-
-    @Override
-    public WriteResult expand(PCollection<T> input) {
-      validate(input.getPipeline().getOptions());
+              + "be set");
 
       DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
       if (dynamicDestinations == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index aabae3e..b893ad5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1175,6 +1175,7 @@ public class BigQueryIOTest implements Serializable {
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                 .withSchema(new TableSchema())
                 .withTestServices(fakeBqServices));
+    p.run();
   }
 
   @Test


Mime
View raw message