beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aviem...@apache.org
Subject [1/6] beam git commit: [BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests
Date Thu, 04 May 2017 18:10:33 GMT
Repository: beam
Updated Branches:
  refs/heads/master 48c8ed176 -> b73918b55


[BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d91a97b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d91a97b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d91a97b

Branch: refs/heads/master
Commit: 8d91a97b77fbda74c577d2cdbd507395834e147c
Parents: 0e2bb18
Author: Aviem Zur <aviemzur@gmail.com>
Authored: Wed May 3 21:06:00 2017 +0300
Committer: Aviem Zur <aviemzur@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  47 +++++++-
 .../runners/spark/SparkRunnerRegistrar.java     |   4 +-
 .../apache/beam/runners/spark/CacheTest.java    |  12 +-
 .../beam/runners/spark/ForceStreamingTest.java  |  18 +--
 .../apache/beam/runners/spark/PipelineRule.java | 109 -------------------
 .../runners/spark/ProvidedSparkContextTest.java |  10 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  15 +--
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../beam/runners/spark/StreamingTest.java       |  23 ++++
 .../metrics/sink/SparkMetricsSinkTest.java      |  12 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  10 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   6 +-
 .../spark/translation/StorageLevelTest.java     |  31 +++++-
 .../translation/streaming/CreateStreamTest.java |  53 ++++-----
 .../ResumeFromCheckpointStreamingTest.java      |  50 ++++++---
 .../streaming/StreamingSourceMetricsTest.java   |  14 +--
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  21 +++-
 18 files changed, 217 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 38d250e..f7200d6 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -333,9 +333,6 @@
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-surefire-plugin</artifactId>
           <configuration>
-            <excludedGroups>
-              org.apache.beam.runners.spark.UsesCheckpointRecovery
-            </excludedGroups>
             <forkCount>1</forkCount>
             <reuseForks>false</reuseForks>
             <systemPropertyVariables>
@@ -344,6 +341,50 @@
               <spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
             </systemPropertyVariables>
           </configuration>
+          <executions>
+            <execution>
+              <id>default-test</id>
+              <goals>
+                <goal>test</goal>
+              </goals>
+              <configuration>
+                <excludedGroups>
+                  org.apache.beam.runners.spark.UsesCheckpointRecovery,
+                  org.apache.beam.runners.spark.StreamingTest
+                </excludedGroups>
+                <systemPropertyVariables>
+                  <beamTestPipelineOptions>
+                    [
+                    "--runner=TestSparkRunner",
+                    "--streaming=false",
+                    "--enableSparkMetricSinks=true"
+                    ]
+                  </beamTestPipelineOptions>
+                </systemPropertyVariables>
+              </configuration>
+            </execution>
+            <execution>
+              <id>streaming-tests</id>
+              <phase>test</phase>
+              <goals>
+                <goal>test</goal>
+              </goals>
+              <configuration>
+                <groups>
+                  org.apache.beam.runners.spark.StreamingTest
+                </groups>
+                <systemPropertyVariables>
+                  <beamTestPipelineOptions>
+                    [
+                    "--runner=TestSparkRunner",
+                    "--forceStreaming=true",
+                    "--enableSparkMetricSinks=true"
+                    ]
+                  </beamTestPipelineOptions>
+                </systemPropertyVariables>
+              </configuration>
+            </execution>
+          </executions>
         </plugin>
         <plugin>
           <groupId>org.codehaus.mojo</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index bedfda4..bf926dc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -54,7 +54,9 @@ public final class SparkRunnerRegistrar {
   public static class Options implements PipelineOptionsRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class);
+      return ImmutableList.<Class<? extends PipelineOptions>>of(
+          SparkPipelineOptions.class,
+          TestSparkPipelineOptions.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
index c3b48d8..24b2e7b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
@@ -23,11 +23,11 @@ import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -36,12 +36,12 @@ import org.junit.Test;
  */
 public class CacheTest {
 
-  @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.batch();
-
   @Test
   public void cacheCandidatesUpdaterTest() throws Exception {
-    Pipeline pipeline = pipelineRule.createPipeline();
+    SparkPipelineOptions options =
+        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+    options.setRunner(TestSparkRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
     PCollection<String> pCollection = pipeline.apply(Create.of("foo", "bar"));
     // first read
     pCollection.apply(Count.<String>globally());
@@ -50,7 +50,7 @@ public class CacheTest {
     // will cache the RDD representing this PCollection
     pCollection.apply(Count.<String>globally());
 
-    JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineRule.getOptions());
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
     EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
     SparkRunner.CacheVisitor cacheVisitor =
         new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt);

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index b60faf2..7bfc980 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -25,9 +25,9 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.junit.Rule;
 import org.junit.Test;
 
 
@@ -44,19 +44,23 @@ import org.junit.Test;
  */
 public class ForceStreamingTest {
 
-  @Rule
-  public final PipelineRule pipelineRule = PipelineRule.streaming();
-
   @Test
   public void test() throws IOException {
-    Pipeline pipeline = pipelineRule.createPipeline();
+    TestSparkPipelineOptions options =
+        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+    options.setRunner(TestSparkRunner.class);
+    options.setForceStreaming(true);
+
+    // pipeline with a bounded read.
+    Pipeline pipeline = Pipeline.create(options);
 
     // apply the BoundedReadFromUnboundedSource.
     BoundedReadFromUnboundedSource<?> boundedRead =
         Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
-    //noinspection unchecked
     pipeline.apply(boundedRead);
-    TestSparkRunner runner = TestSparkRunner.fromOptions(pipelineRule.getOptions());
+
+    // adapt reads
+    TestSparkRunner runner = TestSparkRunner.fromOptions(options);
     runner.adaptBoundedReads(pipeline);
 
     UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector();

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
deleted file mode 100644
index f8499f3..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.joda.time.Duration;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.RuleChain;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/**
- * A {@link org.junit.Rule} to provide a {@link Pipeline} instance for Spark runner tests.
- */
-public class PipelineRule implements TestRule {
-
-  private final SparkPipelineRule delegate;
-  private final RuleChain chain;
-
-  private PipelineRule(SparkPipelineRule delegate) {
-    TestName testName = new TestName();
-    this.delegate = delegate;
-    this.delegate.setTestName(testName);
-    this.chain = RuleChain.outerRule(testName).around(this.delegate);
-  }
-
-  public static PipelineRule streaming() {
-    return new PipelineRule(new SparkStreamingPipelineRule());
-  }
-
-  public static PipelineRule batch() {
-    return new PipelineRule(new SparkPipelineRule());
-  }
-
-  public Duration batchDuration() {
-    return Duration.millis(delegate.options.getBatchIntervalMillis());
-  }
-
-  public TestSparkPipelineOptions getOptions() {
-    return delegate.options;
-  }
-
-  public Pipeline createPipeline() {
-    return Pipeline.create(delegate.options);
-  }
-
-  @Override
-  public Statement apply(Statement statement, Description description) {
-    return chain.apply(statement, description);
-  }
-
-  private static class SparkStreamingPipelineRule extends SparkPipelineRule {
-
-    private final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    @Override
-    protected void before() throws Throwable {
-      super.before();
-      temporaryFolder.create();
-      options.setForceStreaming(true);
-      options.setCheckpointDir(
-          temporaryFolder.newFolder(options.getJobName()).toURI().toURL().toString());
-    }
-
-    @Override
-    protected void after() {
-      temporaryFolder.delete();
-    }
-  }
-
-  private static class SparkPipelineRule extends ExternalResource {
-
-    protected final TestSparkPipelineOptions options =
-        PipelineOptionsFactory.as(TestSparkPipelineOptions.class);
-
-    private TestName testName;
-
-    public void setTestName(TestName testName) {
-      this.testName = testName;
-    }
-
-    @Override
-    protected void before() throws Throwable {
-      options.setRunner(TestSparkRunner.class);
-      options.setEnableSparkMetricSinks(false);
-      options.setJobName(
-          testName != null ? testName.getMethodName() : "test-at-" + System.currentTimeMillis());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index 36ba863..8112993 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -27,9 +27,11 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -95,7 +97,9 @@ public class ProvidedSparkContextTest {
         PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
         // Run test from pipeline
-        p.run().waitUntilFinish();
+        PipelineResult result = p.run();
+
+        TestPipeline.verifyPAssertsSucceeded(p, result);
     }
 
     private void testWithInvalidContext(JavaSparkContext jsc) {
@@ -104,11 +108,9 @@ public class ProvidedSparkContextTest {
         Pipeline p = Pipeline.create(options);
         PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
                 .of()));
-        PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+        inputWords.apply(new WordCount.CountWords())
                 .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
-        PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
         try {
             p.run().waitUntilFinish();
             fail("Should throw an exception when The provided Spark context is null or stopped");

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index ea058b2..9009751 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Distinct;
@@ -48,7 +50,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
-import org.junit.Rule;
 import org.junit.Test;
 
 
@@ -57,15 +58,9 @@ import org.junit.Test;
  */
 public class SparkRunnerDebuggerTest {
 
-  @Rule
-  public final PipelineRule batchPipelineRule = PipelineRule.batch();
-
-  @Rule
-  public final PipelineRule streamingPipelineRule = PipelineRule.streaming();
-
   @Test
   public void debugBatchPipeline() {
-    TestSparkPipelineOptions options = batchPipelineRule.getOptions();
+    PipelineOptions options = PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
     options.setRunner(SparkRunnerDebugger.class);
 
     Pipeline pipeline = Pipeline.create(options);
@@ -111,7 +106,9 @@ public class SparkRunnerDebuggerTest {
 
   @Test
   public void debugStreamingPipeline() {
-    TestSparkPipelineOptions options = streamingPipelineRule.getOptions();
+    TestSparkPipelineOptions options =
+        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+    options.setForceStreaming(true);
     options.setRunner(SparkRunnerDebugger.class);
 
     Pipeline pipeline = Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 4e1fd7c..75899f9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -38,7 +38,7 @@ public class SparkRunnerRegistrarTest {
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(SparkPipelineOptions.class),
+        ImmutableList.of(SparkPipelineOptions.class, TestSparkPipelineOptions.class),
         new SparkRunnerRegistrar.Options().getPipelineOptions());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
new file mode 100644
index 0000000..a34c184
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+/**
+ * Category tag for tests that should be run in streaming mode.
+ */
+public interface StreamingTest {}

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
index b0ad972..fff95cb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -26,11 +26,10 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -48,12 +47,7 @@ public class SparkMetricsSinkTest {
   public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
 
   @Rule
-  public final PipelineRule pipelineRule = PipelineRule.batch();
-
-  private Pipeline createSparkPipeline() {
-    pipelineRule.getOptions().setEnableSparkMetricSinks(true);
-    return pipelineRule.createPipeline();
-  }
+  public final TestPipeline pipeline = TestPipeline.create();
 
   private void runPipeline() {
     final List<String> words =
@@ -62,8 +56,6 @@ public class SparkMetricsSinkTest {
     final Set<String> expectedCounts =
         ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
 
-    final Pipeline pipeline = createSparkPipeline();
-
     final PCollection<String> output =
         pipeline
         .apply(Create.of(words).withCoder(StringUtf8Coder.of()))

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 7188dc5..adde8d2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -33,9 +33,8 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Before;
 import org.junit.Rule;
@@ -54,7 +53,7 @@ public class AvroPipelineTest {
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
   @Rule
-  public final PipelineRule pipelineRule = PipelineRule.batch();
+  public final TestPipeline pipeline = TestPipeline.create();
 
   @Before
   public void setUp() throws IOException {
@@ -72,11 +71,10 @@ public class AvroPipelineTest {
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
     populateGenericFile(Lists.newArrayList(savedRecord), schema);
 
-    Pipeline p = pipelineRule.createPipeline();
-    PCollection<GenericRecord> input = p.apply(
+    PCollection<GenericRecord> input = pipeline.apply(
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
     input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath()));
-    p.run().waitUntilFinish();
+    pipeline.run();
 
     List<GenericRecord> records = readGenericFile();
     assertEquals(Lists.newArrayList(savedRecord), records);

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 5021744..55ee938 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -30,11 +30,10 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -59,7 +58,7 @@ public class NumShardsTest {
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
   @Rule
-  public final PipelineRule pipelineRule = PipelineRule.batch();
+  public final TestPipeline p = TestPipeline.create();
 
   @Before
   public void setUp() throws IOException {
@@ -69,7 +68,6 @@ public class NumShardsTest {
 
   @Test
   public void testText() throws Exception {
-    Pipeline p = pipelineRule.createPipeline();
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 2b7b87b..8f2e681 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -15,30 +15,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.beam.runners.spark.translation;
 
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
+
 /**
  * Test the RDD storage level defined by user.
  */
 public class StorageLevelTest {
 
+  private static String beamTestPipelineOptions;
+
   @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.batch();
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void init() {
+    beamTestPipelineOptions =
+        System.getProperty(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
+
+    System.setProperty(
+        TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS,
+        beamTestPipelineOptions.replace("]", ", \"--storageLevel=DISK_ONLY\"]"));
+  }
+
+  @AfterClass
+  public static void teardown() {
+    System.setProperty(
+        TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS,
+        beamTestPipelineOptions);
+  }
 
   @Test
   public void test() throws Exception {
-    pipelineRule.getOptions().setStorageLevel("DISK_ONLY");
-    Pipeline pipeline = pipelineRule.createPipeline();
-
     PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
 
     // by default, the Spark runner doesn't cache the RDD if it accessed only one time.

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index dd52c05..770e0c0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -24,13 +24,14 @@ import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.ReuseSparkContextRule;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.StreamingTest;
 import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -61,6 +62,7 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
 
@@ -74,10 +76,11 @@ import org.junit.rules.ExpectedException;
  * {@link org.apache.spark.streaming.dstream.QueueInputDStream} and advance the system's WMs.
  * //TODO: add synchronized/processing time trigger.
  */
+@Category(StreamingTest.class)
 public class CreateStreamTest implements Serializable {
 
   @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.streaming();
+  public final transient TestPipeline p = TestPipeline.create();
   @Rule
   public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();
   @Rule
@@ -85,10 +88,9 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testLateDataAccumulating() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant instant = new Instant(0);
     CreateStream<Integer> source =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
             .nextBatch(
@@ -159,9 +161,8 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testDiscardingMode() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     CreateStream<String> source =
-        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), batchDuration())
             .nextBatch(
                 TimestampedValue.of("firstPane", new Instant(100)),
                 TimestampedValue.of("alsoFirstPane", new Instant(200)))
@@ -208,10 +209,9 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testFirstElementLate() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant lateElementTimestamp = new Instant(-1_000_000);
     CreateStream<String> source =
-        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(new Instant(0))
             .nextBatch(
@@ -242,10 +242,9 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testElementsAtAlmostPositiveInfinity() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
     CreateStream<String> source =
-        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), batchDuration())
             .nextBatch(
                 TimestampedValue.of("foo", endOfGlobalWindow),
                 TimestampedValue.of("bar", endOfGlobalWindow))
@@ -267,13 +266,12 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testMultipleStreams() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     CreateStream<String> source =
-        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), batchDuration())
             .nextBatch("foo", "bar")
             .advanceNextBatchWatermarkToInfinity();
     CreateStream<Integer> other =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .nextBatch(1, 2, 3, 4)
             .advanceNextBatchWatermarkToInfinity();
 
@@ -298,10 +296,9 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testFlattenedWithWatermarkHold() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant instant = new Instant(0);
     CreateStream<Integer> source1 =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
             .nextBatch(
@@ -310,7 +307,7 @@ public class CreateStreamTest implements Serializable {
                 TimestampedValue.of(3, instant))
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10)));
     CreateStream<Integer> source2 =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1)))
             .nextBatch(
@@ -323,14 +320,14 @@ public class CreateStreamTest implements Serializable {
             .advanceNextBatchWatermarkToInfinity();
 
     PCollection<Integer> windowed1 = p
-        .apply(source1)
-        .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
+        .apply("CreateStream1", source1)
+        .apply("Window1", Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
             .triggering(AfterWatermark.pastEndOfWindow())
             .accumulatingFiredPanes()
             .withAllowedLateness(Duration.ZERO));
     PCollection<Integer> windowed2 = p
-        .apply(source2)
-        .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
+        .apply("CreateStream2", source2)
+        .apply("Window2", Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
             .triggering(AfterWatermark.pastEndOfWindow())
             .accumulatingFiredPanes()
             .withAllowedLateness(Duration.ZERO));
@@ -357,10 +354,9 @@ public class CreateStreamTest implements Serializable {
    */
   @Test
   public void testMultiOutputParDo() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant instant = new Instant(0);
     CreateStream<Integer> source1 =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
             .nextBatch(
@@ -397,7 +393,7 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testElementAtPositiveInfinityThrows() {
     CreateStream<Integer> source =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .nextBatch(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)))
             .advanceNextBatchWatermarkToInfinity();
     thrown.expect(IllegalArgumentException.class);
@@ -407,7 +403,7 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testAdvanceWatermarkNonMonotonicThrows() {
     CreateStream<Integer> source =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .advanceWatermarkForNextBatch(new Instant(0L));
     thrown.expect(IllegalArgumentException.class);
     source
@@ -418,9 +414,14 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
     CreateStream<Integer> source =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
     thrown.expect(IllegalArgumentException.class);
     source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
+
+  private Duration batchDuration() {
+    return Duration.millis(
+        (p.getOptions().as(SparkPipelineOptions.class)).getBatchIntervalMillis());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 33571f0..584edac 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -28,15 +28,16 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.ReuseSparkContextRule;
 import org.apache.beam.runners.spark.SparkPipelineResult;
 import org.apache.beam.runners.spark.TestSparkPipelineOptions;
+import org.apache.beam.runners.spark.TestSparkRunner;
 import org.apache.beam.runners.spark.UsesCheckpointRecovery;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
@@ -53,6 +54,7 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -81,11 +83,12 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Tests DStream recovery from checkpoint.
@@ -96,24 +99,34 @@ import org.junit.experimental.categories.Category;
  * {@link Metrics} values that are expected to resume from previous count and a side-input that is
  * expected to recover as well.
  */
-public class ResumeFromCheckpointStreamingTest {
+public class ResumeFromCheckpointStreamingTest implements Serializable {
   private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
       new EmbeddedKafkaCluster.EmbeddedZookeeper();
   private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
       new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
   private static final String TOPIC = "kafka_beam_test_topic";
 
+  private transient TemporaryFolder temporaryFolder;
+
   @Rule
   public final transient ReuseSparkContextRule noContextReuse = ReuseSparkContextRule.no();
-  @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.streaming();
 
   @BeforeClass
-  public static void init() throws IOException {
+  public static void setup() throws IOException {
     EMBEDDED_ZOOKEEPER.startup();
     EMBEDDED_KAFKA_CLUSTER.startup();
   }
 
+  @Before
+  public void init() {
+    temporaryFolder = new TemporaryFolder();
+    try {
+      temporaryFolder.create();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private static void produce(Map<String, Instant> messages) {
     Properties producerProps = new Properties();
     producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
@@ -148,7 +161,7 @@ public class ResumeFromCheckpointStreamingTest {
             .build();
 
     // first run should expect EOT matching the last injected element.
-    SparkPipelineResult res = run(pipelineRule, Optional.of(new Instant(400)), 0);
+    SparkPipelineResult res = run(Optional.of(new Instant(400)), 0);
 
     assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
         hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
@@ -169,7 +182,7 @@ public class ResumeFromCheckpointStreamingTest {
     ));
 
     // recovery should resume from last read offset, and read the second batch of input.
-    res = runAgain(pipelineRule, 1);
+    res = runAgain(1);
     // assertions 2:
     assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
         hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
@@ -209,18 +222,18 @@ public class ResumeFromCheckpointStreamingTest {
         String.format("Found %d failed assertions.", failedAssertions),
         failedAssertions,
         is(0L));
-
   }
 
-  private SparkPipelineResult runAgain(PipelineRule pipelineRule, int expectedAssertions) {
+  private SparkPipelineResult runAgain(int expectedAssertions) {
     // sleep before next run.
     Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-    return run(pipelineRule, Optional.<Instant>absent(), expectedAssertions);
+    return run(Optional.<Instant>absent(), expectedAssertions);
   }
 
   @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-  private static SparkPipelineResult run(
-      PipelineRule pipelineRule, Optional<Instant> stopWatermarkOption, int expectedAssertions) {
+  private SparkPipelineResult run(
+      Optional<Instant> stopWatermarkOption,
+      int expectedAssertions) {
     KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read()
         .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
         .withTopics(Collections.singletonList(TOPIC))
@@ -242,15 +255,21 @@ public class ResumeFromCheckpointStreamingTest {
           }
         });
 
-    TestSparkPipelineOptions options = pipelineRule.getOptions();
+    TestSparkPipelineOptions options =
+        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
     options.setSparkMaster("local[*]");
     options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
     options.setExpectedAssertions(expectedAssertions);
+    options.setRunner(TestSparkRunner.class);
+    options.setEnableSparkMetricSinks(false);
+    options.setForceStreaming(true);
+    options.setCheckpointDir(temporaryFolder.getRoot().getPath());
     // timeout is per execution so it can be injected by the caller.
     if (stopWatermarkOption.isPresent()) {
       options.setStopPipelineWatermark(stopWatermarkOption.get().getMillis());
     }
-    Pipeline p = pipelineRule.createPipeline();
+
+    Pipeline p = Pipeline.create(options);
 
     PCollection<String> expectedCol =
         p.apply(Create.of(ImmutableList.of("side1", "side2")).withCoder(StringUtf8Coder.of()));
@@ -354,5 +373,4 @@ public class ResumeFromCheckpointStreamingTest {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
index 5a4b1b5..df6027c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
@@ -23,9 +23,7 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.runners.spark.StreamingTest;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Source;
@@ -34,10 +32,11 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.metrics.SourceMetrics;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
 
 /**
  * Verify metrics support for {@link Source Sources} in streaming pipelines.
@@ -47,14 +46,11 @@ public class StreamingSourceMetricsTest implements Serializable {
 
   // Force streaming pipeline using pipeline rule.
   @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.streaming();
+  public final transient TestPipeline pipeline = TestPipeline.create();
 
   @Test
+  @Category(StreamingTest.class)
   public void testUnboundedSourceMetrics() {
-    TestSparkPipelineOptions options = pipelineRule.getOptions();
-
-    Pipeline pipeline = Pipeline.create(options);
-
     final long numElements = 1000;
 
     pipeline.apply(

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 6b15f0d..6fa7a5a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -460,7 +460,7 @@ public class Pipeline {
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
   private final List<String> unstableNames = new ArrayList<>();
-  private final PipelineOptions defaultOptions;
+  protected final PipelineOptions defaultOptions;
 
   protected Pipeline(PipelineOptions options) {
     this.defaultOptions = options;

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index d8fe51d..2d34b22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -244,8 +245,11 @@ public class TestPipeline extends Pipeline implements TestRule {
     }
   }
 
-  static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+  /** System property used to set {@link TestPipelineOptions}. */
+  public static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+
   static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
+
   private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
       ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
@@ -331,7 +335,7 @@ public class TestPipeline extends Pipeline implements TestRule {
     try {
       enforcement.get().beforePipelineExecution();
       pipelineResult = super.run();
-      verifyPAssertsSucceeded(pipelineResult);
+      verifyPAssertsSucceeded(this, pipelineResult);
     } catch (RuntimeException exc) {
       Throwable cause = exc.getCause();
       if (cause instanceof AssertionError) {
@@ -377,6 +381,15 @@ public class TestPipeline extends Pipeline implements TestRule {
     return this;
   }
 
+  @VisibleForTesting
+  @Override
+  /**
+   * Get this pipeline's options.
+   */
+  public PipelineOptions getOptions() {
+    return defaultOptions;
+  }
+
   @Override
   public String toString() {
     return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
@@ -501,9 +514,9 @@ public class TestPipeline extends Pipeline implements TestRule {
    * <p>Note this only runs for runners which support Metrics. Runners which do not should verify
    * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p>
    */
-  private void verifyPAssertsSucceeded(PipelineResult pipelineResult) {
+  public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) {
     if (MetricsEnvironment.isMetricsSupported()) {
-      long expectedNumberOfAssertions = (long) PAssert.countAsserts(this);
+      long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline);
 
       long successfulAssertions = 0;
       Iterable<MetricResult<Long>> successCounterResults =


Mime
View raw message