beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [09/17] incubator-beam git commit: Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration.
Date Tue, 20 Dec 2016 19:04:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 2a89a18..3bc0a65 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -38,7 +38,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -76,12 +75,12 @@ import org.junit.runners.JUnit4;
 @SuppressWarnings("unchecked")
 public class CreateTest {
   @Rule public final ExpectedException thrown = ExpectedException.none();
+  @Rule public final TestPipeline p = TestPipeline.create();
+
 
   @Test
   @Category(RunnableOnService.class)
   public void testCreate() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> output =
         p.apply(Create.of(LINES));
 
@@ -93,8 +92,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateEmpty() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> output =
         p.apply(Create.of(NO_LINES)
             .withCoder(StringUtf8Coder.of()));
@@ -106,7 +103,7 @@ public class CreateTest {
 
   @Test
   public void testCreateEmptyInfersCoder() {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
 
     PCollection<Object> output =
         p.apply(Create.of());
@@ -126,8 +123,6 @@ public class CreateTest {
     thrown.expectMessage(
         Matchers.containsString("Unable to infer a coder"));
 
-    Pipeline p = TestPipeline.create();
-
     // Create won't infer a default coder in this case.
     p.apply(Create.of(new Record(), new Record2()));
 
@@ -137,8 +132,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateWithNullsAndValues() throws Exception {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> output =
         p.apply(Create.of(null, "test1", null, "test2", null)
             .withCoder(SerializableCoder.of(String.class)));
@@ -150,8 +143,6 @@ public class CreateTest {
   @Test
   @Category(NeedsRunner.class)
   public void testCreateParameterizedType() throws Exception {
-    Pipeline p = TestPipeline.create();
-
     PCollection<TimestampedValue<String>> output =
         p.apply(Create.of(
             TimestampedValue.of("a", new Instant(0)),
@@ -216,7 +207,6 @@ public class CreateTest {
     Create.Values<UnserializableRecord> create =
         Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder());
 
-    TestPipeline p = TestPipeline.create();
     PAssert.that(p.apply(create))
         .containsInAnyOrder(
             new UnserializableRecord("foo"),
@@ -235,8 +225,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateTimestamped() {
-    Pipeline p = TestPipeline.create();
-
     List<TimestampedValue<String>> data = Arrays.asList(
         TimestampedValue.of("a", new Instant(1L)),
         TimestampedValue.of("b", new Instant(2L)),
@@ -254,8 +242,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateTimestampedEmpty() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> output = p
         .apply(Create.timestamped(new ArrayList<TimestampedValue<String>>())
             .withCoder(StringUtf8Coder.of()));
@@ -266,7 +252,7 @@ public class CreateTest {
 
   @Test
   public void testCreateTimestampedEmptyInfersCoder() {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
 
     PCollection<Object> output = p
         .apply(Create.timestamped());
@@ -280,8 +266,6 @@ public class CreateTest {
     thrown.expectMessage(
         Matchers.containsString("Unable to infer a coder"));
 
-    Pipeline p = TestPipeline.create();
-
     // Create won't infer a default coder in this case.
     PCollection<Record> c = p.apply(Create.timestamped(
         TimestampedValue.of(new Record(), new Instant(0)),
@@ -295,7 +279,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateWithVoidType() throws Exception {
-    Pipeline p = TestPipeline.create();
     PCollection<Void> output = p.apply(Create.of((Void) null, (Void) null));
     PAssert.that(output).containsInAnyOrder((Void) null, (Void) null);
     p.run();
@@ -304,8 +287,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateWithKVVoidType() throws Exception {
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<Void, Void>> output = p.apply(Create.of(
         KV.of((Void) null, (Void) null),
         KV.of((Void) null, (Void) null)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
index 257b364..b3b3925 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java
@@ -24,13 +24,13 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -41,6 +41,10 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class DistinctTest {
+
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testDistinct() {
@@ -53,8 +57,6 @@ public class DistinctTest {
         "k2",
         "k3");
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input =
         p.apply(Create.of(strings)
             .withCoder(StringUtf8Coder.of()));
@@ -72,8 +74,6 @@ public class DistinctTest {
   public void testDistinctEmpty() {
     List<String> strings = Arrays.asList();
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input =
         p.apply(Create.of(strings)
             .withCoder(StringUtf8Coder.of()));
@@ -115,8 +115,6 @@ public class DistinctTest {
         KV.of("k1", "v2"),
         KV.of("k2", "v1"));
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, String>> input = p.apply(Create.of(strings));
 
     PCollection<KV<String, String>> output =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index e5f5cb6..19b7c51 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -40,6 +40,9 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link DoFn}. */
 @RunWith(JUnit4.class)
 public class DoFnTest implements Serializable {
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
@@ -200,7 +203,6 @@ public class DoFnTest implements Serializable {
    * Initialize a test pipeline with the specified {@link DoFn}.
    */
   private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) {
-    TestPipeline pipeline = TestPipeline.create();
     pipeline.apply(Create.of((InputT) null))
      .apply(ParDo.of(fn));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 2dafa27..3859c9f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -52,6 +52,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class DoFnTesterTest {
+
+  @Rule public final TestPipeline p = TestPipeline.create();
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
@@ -324,7 +326,7 @@ public class DoFnTesterTest {
   public void fnWithSideInputDefault() throws Exception {
     final PCollectionView<Integer> value =
         PCollectionViews.singletonView(
-            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+            p, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
 
     try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
       tester.processElement(1);
@@ -339,7 +341,7 @@ public class DoFnTesterTest {
   public void fnWithSideInputExplicit() throws Exception {
     final PCollectionView<Integer> value =
         PCollectionViews.singletonView(
-            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+            p, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
 
     try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
       tester.setSideInput(value, GlobalWindow.INSTANCE, -2);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index 5221f75..81e1d02 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -57,11 +58,12 @@ public class FilterTest implements Serializable {
     }
   }
 
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testIdentityFilterByPredicate() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> output = p
         .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
         .apply(Filter.by(new TrivialFn(true)));
@@ -73,8 +75,6 @@ public class FilterTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testNoFilterByPredicate() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 4, 5))
         .apply(Filter.by(new TrivialFn(false)));
@@ -86,8 +86,6 @@ public class FilterTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFilterByPredicate() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
         .apply(Filter.by(new EvenFn()));
@@ -99,8 +97,6 @@ public class FilterTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFilterLessThan() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
         .apply(Filter.lessThan(4));
@@ -112,8 +108,6 @@ public class FilterTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFilterGreaterThan() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
         .apply(Filter.greaterThan(4));
@@ -125,8 +119,6 @@ public class FilterTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFilterLessThanEq() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
         .apply(Filter.lessThanEq(4));
@@ -138,8 +130,6 @@ public class FilterTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFilterGreaterThanEq() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> output = p
         .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
         .apply(Filter.greaterThanEq(4));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index bb2877e..b24071e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -27,7 +27,6 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -49,6 +48,9 @@ import org.junit.runners.JUnit4;
 public class FlatMapElementsTest implements Serializable {
 
   @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
   /**
@@ -57,7 +59,6 @@ public class FlatMapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testFlatMapBasic() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 3))
 
@@ -82,7 +83,6 @@ public class FlatMapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testFlatMapFnOutputTypeDescriptor() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<String> output = pipeline
         .apply(Create.of("hello"))
         .apply(FlatMapElements.via(new SimpleFunction<String, Set<String>>() {
@@ -117,7 +117,8 @@ public class FlatMapElementsTest implements Serializable {
    */
   @Test
   public void testPolymorphicSimpleFunction() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(false);
+
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 3))
 
@@ -168,7 +169,6 @@ public class FlatMapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testVoidValues() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     pipeline
         .apply(Create.of("hello"))
         .apply(WithKeys.<String, String>of("k"))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index d4686a4..48251bc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -65,6 +65,9 @@ import org.junit.runners.JUnit4;
 public class FlattenTest implements Serializable {
 
   @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
 
@@ -74,8 +77,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenPCollectionList() {
-    Pipeline p = TestPipeline.create();
-
     List<List<String>> inputs = Arrays.asList(
       LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
 
@@ -90,8 +91,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenPCollectionListThenParDo() {
-    Pipeline p = TestPipeline.create();
-
     List<List<String>> inputs = Arrays.asList(
       LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
 
@@ -107,8 +106,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenPCollectionListEmpty() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> output =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of());
@@ -120,8 +117,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenInputMultipleCopies() {
-    Pipeline p = TestPipeline.create();
-
     int count = 5;
     PCollection<Long> longs = p.apply("mkLines", CountingInput.upTo(count));
     PCollection<Long> biggerLongs =
@@ -154,8 +149,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyFlattenAsSideInput() {
-    Pipeline p = TestPipeline.create();
-
     final PCollectionView<Iterable<String>> view =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
@@ -179,9 +172,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenPCollectionListEmptyThenParDo() {
-
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> output =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
@@ -198,8 +188,6 @@ public class FlattenTest implements Serializable {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("cannot provide a Coder for empty");
 
-    Pipeline p = TestPipeline.create();
-
     PCollectionList.<ClassWithoutCoder>empty(p)
         .apply(Flatten.<ClassWithoutCoder>pCollections());
 
@@ -211,8 +199,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenIterables() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<Iterable<String>> input = p
         .apply(Create.<Iterable<String>>of(LINES)
             .withCoder(IterableCoder.of(StringUtf8Coder.of())));
@@ -229,8 +215,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenIterablesLists() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<List<String>> input =
         p.apply(Create.<List<String>>of(LINES).withCoder(ListCoder.of(StringUtf8Coder.of())));
 
@@ -244,8 +228,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenIterablesSets() {
-    Pipeline p = TestPipeline.create();
-
     Set<String> linesSet = ImmutableSet.copyOf(LINES);
 
     PCollection<Set<String>> input =
@@ -261,9 +243,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenIterablesCollections() {
-
-    Pipeline p = TestPipeline.create();
-
     Set<String> linesSet = ImmutableSet.copyOf(LINES);
 
     PCollection<Collection<String>> input =
@@ -280,8 +259,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFlattenIterablesEmpty() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<Iterable<String>> input = p
         .apply(Create.<Iterable<String>>of(NO_LINES)
             .withCoder(IterableCoder.of(StringUtf8Coder.of())));
@@ -300,8 +277,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testEqualWindowFnPropagation() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input1 =
         p.apply("CreateInput1", Create.of("Input1"))
         .apply("Window1", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
@@ -322,8 +297,6 @@ public class FlattenTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testCompatibleWindowFnPropagation() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input1 =
         p.apply("CreateInput1", Create.of("Input1"))
         .apply("Window1",
@@ -345,7 +318,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   public void testIncompatibleWindowFnPropagationFailure() {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
 
     PCollection<String> input1 =
         p.apply("CreateInput1", Create.of("Input1"))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index ebde110..f4bec3a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -39,7 +39,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -81,6 +80,9 @@ import org.junit.runners.JUnit4;
 public class GroupByKeyTest {
 
   @Rule
+  public final TestPipeline p = TestPipeline.create();
+
+  @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
@@ -95,8 +97,6 @@ public class GroupByKeyTest {
         KV.of("k2", -33),
         KV.of("k3", 0));
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(ungroupedPairs)
             .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
@@ -137,8 +137,6 @@ public class GroupByKeyTest {
         KV.of("k2", -33),  // window [5, 10)
         KV.of("k3", 0));  // window [5, 10)
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         p.apply(Create.timestamped(ungroupedPairs, Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L))
             .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
@@ -174,8 +172,6 @@ public class GroupByKeyTest {
   public void testGroupByKeyEmpty() {
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(ungroupedPairs)
             .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
@@ -193,8 +189,6 @@ public class GroupByKeyTest {
 
     List<KV<Map<String, String>, Integer>> ungroupedPairs = Arrays.asList();
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<Map<String, String>, Integer>> input =
         p.apply(Create.of(ungroupedPairs)
             .withCoder(
@@ -209,7 +203,6 @@ public class GroupByKeyTest {
   @Test
   @Category(NeedsRunner.class)
   public void testIdentityWindowFnPropagation() {
-    Pipeline p = TestPipeline.create();
 
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -230,7 +223,6 @@ public class GroupByKeyTest {
   @Test
   @Category(NeedsRunner.class)
   public void testWindowFnInvalidation() {
-    Pipeline p = TestPipeline.create();
 
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -255,7 +247,6 @@ public class GroupByKeyTest {
 
   @Test
   public void testInvalidWindowsDirect() {
-    Pipeline p = TestPipeline.create();
 
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -275,7 +266,6 @@ public class GroupByKeyTest {
   @Test
   @Category(NeedsRunner.class)
   public void testRemerge() {
-    Pipeline p = TestPipeline.create();
 
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -300,7 +290,6 @@ public class GroupByKeyTest {
 
   @Test
   public void testGroupByKeyDirectUnbounded() {
-    Pipeline p = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input =
         p.apply(
@@ -331,9 +320,8 @@ public class GroupByKeyTest {
   @Test
   @Category(RunnableOnService.class)
   public void testOutputTimeFnEarliest() {
-    Pipeline pipeline = TestPipeline.create();
 
-    pipeline.apply(
+    p.apply(
         Create.timestamped(
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
@@ -342,7 +330,7 @@ public class GroupByKeyTest {
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new AssertTimestamp(new Instant(0))));
 
-    pipeline.run();
+    p.run();
   }
 
 
@@ -353,9 +341,7 @@ public class GroupByKeyTest {
   @Test
   @Category(RunnableOnService.class)
   public void testOutputTimeFnLatest() {
-    Pipeline pipeline = TestPipeline.create();
-
-    pipeline.apply(
+    p.apply(
         Create.timestamped(
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
@@ -364,7 +350,7 @@ public class GroupByKeyTest {
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new AssertTimestamp(new Instant(10))));
 
-    pipeline.run();
+    p.run();
   }
 
   private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> {
@@ -408,8 +394,6 @@ public class GroupByKeyTest {
     final int numValues = 10;
     final int numKeys = 5;
 
-    Pipeline p = TestPipeline.create();
-
     p.getCoderRegistry().registerCoder(BadEqualityKey.class, DeterministicKeyCoder.class);
 
     // construct input data

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
index fce5b2f..2a19802 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -27,6 +26,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -50,11 +50,12 @@ public class KeysTest {
   static final KV<String, Integer>[] EMPTY_TABLE = new KV[] {
   };
 
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testKeys() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(TABLE)).withCoder(
             KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
@@ -69,8 +70,6 @@ public class KeysTest {
   @Test
   @Category(RunnableOnService.class)
   public void testKeysEmpty() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder(
             KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
index 3598198..24186ed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -27,6 +26,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -50,11 +50,12 @@ public class KvSwapTest {
   static final KV<String, Integer>[] EMPTY_TABLE = new KV[] {
   };
 
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testKvSwap() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(TABLE)).withCoder(
             KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
@@ -75,8 +76,6 @@ public class KvSwapTest {
   @Test
   @Category(RunnableOnService.class)
   public void testKvSwapEmpty() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder(
             KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
index ce9ae37..f71b813 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java
@@ -51,12 +51,13 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class LatestTest implements Serializable {
+
+  @Rule public final transient TestPipeline p = TestPipeline.create();
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
   @Category(NeedsRunner.class)
   public void testGloballyEventTimestamp() {
-    TestPipeline p = TestPipeline.create();
     PCollection<String> output =
         p.apply(Create.timestamped(
             TimestampedValue.of("foo", new Instant(100)),
@@ -71,7 +72,8 @@ public class LatestTest implements Serializable {
 
   @Test
   public void testGloballyOutputCoder() {
-    TestPipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
+
     BigEndianLongCoder inputCoder = BigEndianLongCoder.of();
 
     PCollection<Long> output =
@@ -86,7 +88,6 @@ public class LatestTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testGloballyEmptyCollection() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Long> emptyInput = p.apply(Create.<Long>of()
         // Explicitly set coder such that then runner enforces encodability.
         .withCoder(VarLongCoder.of()));
@@ -99,7 +100,6 @@ public class LatestTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testPerKeyEventTimestamp() {
-    TestPipeline p = TestPipeline.create();
     PCollection<KV<String, String>> output =
         p.apply(Create.timestamped(
             TimestampedValue.of(KV.of("A", "foo"), new Instant(100)),
@@ -114,7 +114,8 @@ public class LatestTest implements Serializable {
 
   @Test
   public void testPerKeyOutputCoder() {
-    TestPipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
+
     KvCoder<String, Long> inputCoder = KvCoder.of(
         AvroCoder.of(String.class), AvroCoder.of(Long.class));
 
@@ -128,7 +129,6 @@ public class LatestTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testPerKeyEmptyCollection() {
-    TestPipeline p = TestPipeline.create();
     PCollection<KV<String, String>> output =
         p.apply(Create.<KV<String, String>>of().withCoder(KvCoder.of(
             StringUtf8Coder.of(), StringUtf8Coder.of())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index ac3444b..47d0b87 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -48,6 +47,9 @@ import org.junit.runners.JUnit4;
 public class MapElementsTest implements Serializable {
 
   @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
   /**
@@ -79,7 +81,6 @@ public class MapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMapBasic() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 3))
         .apply(MapElements.via(new SimpleFunction<Integer, Integer>() {
@@ -98,7 +99,8 @@ public class MapElementsTest implements Serializable {
    */
   @Test
   public void testPolymorphicSimpleFunction() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(false);
+
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 3))
 
@@ -120,7 +122,8 @@ public class MapElementsTest implements Serializable {
    */
   @Test
   public void testNestedPolymorphicSimpleFunction() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(false);
+
     PCollection<Integer> output =
         pipeline
             .apply(Create.of(1, 2, 3))
@@ -149,7 +152,6 @@ public class MapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMapBasicSerializableFunction() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> output = pipeline
         .apply(Create.of(1, 2, 3))
         .apply(MapElements.via(new SerializableFunction<Integer, Integer>() {
@@ -170,7 +172,6 @@ public class MapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testSimpleFunctionOutputTypeDescriptor() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<String> output = pipeline
         .apply(Create.of("hello"))
         .apply(MapElements.via(new SimpleFunction<String, String>() {
@@ -191,7 +192,6 @@ public class MapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testVoidValues() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     pipeline
         .apply(Create.of("hello"))
         .apply(WithKeys.<String, String>of("k"))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index 9bc8a64..2c3a735 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -43,10 +44,13 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class ParDoLifecycleTest implements Serializable {
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testOldFnCallSequence() {
-    TestPipeline p = TestPipeline.create();
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
         .apply(Flatten.<Integer>pCollections())
@@ -58,7 +62,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testOldFnCallSequenceMulti() {
-    TestPipeline p = TestPipeline.create();
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
         .apply(Flatten.<Integer>pCollections())
@@ -127,7 +130,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFnCallSequence() {
-    TestPipeline p = TestPipeline.create();
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
         .apply(Flatten.<Integer>pCollections())
@@ -139,7 +141,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFnCallSequenceMulti() {
-    TestPipeline p = TestPipeline.create();
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
         .apply(Flatten.<Integer>pCollections())
@@ -206,7 +207,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testTeardownCalledAfterExceptionInSetup() {
-    TestPipeline p = TestPipeline.create();
     ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP);
     p
         .apply(Create.of(1, 2, 3))
@@ -227,7 +227,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testTeardownCalledAfterExceptionInStartBundle() {
-    TestPipeline p = TestPipeline.create();
     ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
     p
         .apply(Create.of(1, 2, 3))
@@ -246,7 +245,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testTeardownCalledAfterExceptionInProcessElement() {
-    TestPipeline p = TestPipeline.create();
     ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
     p
         .apply(Create.of(1, 2, 3))
@@ -265,7 +263,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testTeardownCalledAfterExceptionInFinishBundle() {
-    TestPipeline p = TestPipeline.create();
     ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
     p
         .apply(Create.of(1, 2, 3))
@@ -284,7 +281,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWithContextTeardownCalledAfterExceptionInSetup() {
-    TestPipeline p = TestPipeline.create();
     ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP);
     p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
     try {
@@ -300,7 +296,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWithContextTeardownCalledAfterExceptionInStartBundle() {
-    TestPipeline p = TestPipeline.create();
     ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
     p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
     try {
@@ -316,7 +311,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWithContextTeardownCalledAfterExceptionInProcessElement() {
-    TestPipeline p = TestPipeline.create();
     ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
     p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
     try {
@@ -332,7 +326,6 @@ public class ParDoLifecycleTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWithContextTeardownCalledAfterExceptionInFinishBundle() {
-    TestPipeline p = TestPipeline.create();
     ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
     p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
     try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 4a3e2dd..3a47fc7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -47,7 +47,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -107,6 +106,9 @@ public class ParDoTest implements Serializable {
   // anonymous inner classes inside the non-static test methods.
 
   @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
   private static class PrintingDoFn extends DoFn<String, String> {
@@ -302,7 +304,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDo() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -319,7 +320,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDo2() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -336,7 +336,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDoEmpty() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList();
 
@@ -354,8 +353,6 @@ public class ParDoTest implements Serializable {
   @Category(RunnableOnService.class)
   public void testParDoEmptyOutputs() {
 
-    Pipeline pipeline = TestPipeline.create();
-
     List<Integer> inputs = Arrays.asList();
 
     PCollection<String> output = pipeline
@@ -370,7 +367,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDoWithSideOutputs() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -413,7 +409,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDoEmptyWithSideOutputs() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList();
 
@@ -454,7 +449,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDoWithEmptySideOutputs() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList();
 
@@ -482,7 +476,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDoWithOnlySideOutputs() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -507,7 +500,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoWritingToUndeclaredSideOutput() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -529,7 +521,7 @@ public class ParDoTest implements Serializable {
   // TODO: The exception thrown is runner-specific, even if the behavior is general
   @Category(NeedsRunner.class)
   public void testParDoUndeclaredSideOutputLimit() {
-    Pipeline pipeline = TestPipeline.create();
+
     PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3)));
 
     // Success for a total of 1000 outputs.
@@ -566,7 +558,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDoWithSideInputs() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -598,7 +589,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDoWithSideInputsIsCumulative() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -632,7 +622,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMultiOutputParDoWithSideInputs() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -670,7 +659,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testMultiOutputParDoWithSideInputsIsCumulative() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -708,7 +696,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoReadingFromUnknownSideInput() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -746,7 +733,6 @@ public class ParDoTest implements Serializable {
     // on an input where the element is in multiple windows. The complication is
     // that side inputs are per-window, so the runner has to make sure
     // to process each window individually.
-    Pipeline p = TestPipeline.create();
 
     MutableDateTime mutableNow = Instant.now().toMutableDateTime();
     mutableNow.setMillisOfSecond(0);
@@ -754,9 +740,9 @@ public class ParDoTest implements Serializable {
 
     SlidingWindows windowFn =
         SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
-    PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
+    PCollectionView<Integer> view = pipeline.apply(Create.of(1)).apply(View.<Integer>asSingleton());
     PCollection<String> res =
-        p.apply(Create.timestamped(TimestampedValue.of("a", now)))
+        pipeline.apply(Create.timestamped(TimestampedValue.of("a", now)))
             .apply(Window.<String>into(windowFn))
             .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
 
@@ -766,14 +752,12 @@ public class ParDoTest implements Serializable {
       PAssert.that(res).inWindow(window).containsInAnyOrder("a:1");
     }
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testParDoWithErrorInStartBatch() {
-    Pipeline pipeline = TestPipeline.create();
-
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
     pipeline.apply(Create.of(inputs))
@@ -787,7 +771,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoWithErrorInProcessElement() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -802,7 +785,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoWithErrorInFinishBatch() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -816,23 +798,27 @@ public class ParDoTest implements Serializable {
 
   @Test
   public void testParDoOutputNameBasedOnDoFnWithTrimmedSuffix() {
-    Pipeline p = TestPipeline.create();
-    PCollection<String> output = p.apply(Create.of(1)).apply(ParDo.of(new TestDoFn()));
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    PCollection<String> output = pipeline.apply(Create.of(1)).apply(ParDo.of(new TestDoFn()));
     assertThat(output.getName(), containsString("ParDo(Test)"));
   }
 
   @Test
   public void testParDoOutputNameBasedOnLabel() {
-    Pipeline p = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(false);
+
     PCollection<String> output =
-        p.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestDoFn()));
+        pipeline.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestDoFn()));
     assertThat(output.getName(), containsString("MyParDo"));
   }
 
   @Test
   public void testParDoOutputNameBasedDoFnWithoutMatchingSuffix() {
-    Pipeline p = TestPipeline.create();
-    PCollection<String> output = p.apply(Create.of(1)).apply(ParDo.of(new StrangelyNamedDoer()));
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    PCollection<String> output =
+        pipeline.apply(Create.of(1)).apply(ParDo.of(new StrangelyNamedDoer()));
     assertThat(output.getName(), containsString("ParDo(StrangelyNamedDoer)"));
   }
 
@@ -850,7 +836,7 @@ public class ParDoTest implements Serializable {
 
   @Test
   public void testParDoWithSideOutputsName() {
-    Pipeline p = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(false);
 
     TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
     TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
@@ -858,7 +844,7 @@ public class ParDoTest implements Serializable {
     TupleTag<String> sideOutputTag3 = new TupleTag<String>("side3"){};
     TupleTag<String> sideOutputTagUnwritten = new TupleTag<String>("sideUnwritten"){};
 
-    PCollectionTuple outputs = p
+    PCollectionTuple outputs = pipeline
         .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput")
         .apply("MyParDo", ParDo
                .of(new TestDoFn(
@@ -880,7 +866,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testParDoInCustomTransform() {
-    Pipeline pipeline = TestPipeline.create();
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
@@ -904,7 +889,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMultiOutputChaining() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollectionTuple filters = pipeline
         .apply(Create.of(Arrays.asList(3, 4, 5, 6)))
@@ -1106,7 +1090,7 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testSideOutputUnknownCoder() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
+
     PCollection<Integer> input = pipeline
         .apply(Create.of(Arrays.asList(1, 2, 3)));
 
@@ -1122,7 +1106,8 @@ public class ParDoTest implements Serializable {
 
   @Test
   public void testSideOutputUnregisteredExplicitCoder() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(false);
+
     PCollection<Integer> input = pipeline
         .apply(Create.of(Arrays.asList(1, 2, 3)));
 
@@ -1144,7 +1129,7 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMainOutputUnregisteredExplicitCoder() {
-    Pipeline pipeline = TestPipeline.create();
+
     PCollection<Integer> input = pipeline
         .apply(Create.of(Arrays.asList(1, 2, 3)));
 
@@ -1165,7 +1150,6 @@ public class ParDoTest implements Serializable {
     // should not cause a crash based on lack of a coder for the
     // side output.
 
-    Pipeline pipeline = TestPipeline.create();
     final TupleTag<TestDummy> mainOutputTag = new TupleTag<TestDummy>("main");
     final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("side");
     PCollectionTuple tuple = pipeline
@@ -1204,7 +1188,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoOutputWithTimestamp() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<Integer> input =
         pipeline.apply(Create.of(Arrays.asList(3, 42, 6)));
@@ -1226,7 +1209,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoSideOutputWithTimestamp() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<Integer> input =
         pipeline.apply(Create.of(Arrays.asList(3, 42, 6)));
@@ -1258,7 +1240,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoShiftTimestamp() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<Integer> input =
         pipeline.apply(Create.of(Arrays.asList(3, 42, 6)));
@@ -1281,7 +1262,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoShiftTimestampInvalid() {
-    Pipeline pipeline = TestPipeline.create();
 
     pipeline.apply(Create.of(Arrays.asList(3, 42, 6)))
         .apply(ParDo.of(new TestOutputTimestampDoFn()))
@@ -1300,7 +1280,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoShiftTimestampInvalidZeroAllowed() {
-    Pipeline pipeline = TestPipeline.create();
 
     pipeline.apply(Create.of(Arrays.asList(3, 42, 6)))
         .apply(ParDo.of(new TestOutputTimestampDoFn()))
@@ -1353,7 +1332,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowingInStartAndFinishBundle() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<String> output =
         pipeline
@@ -1391,7 +1369,6 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWindowingInStartBundleException() {
-    Pipeline pipeline = TestPipeline.create();
 
     pipeline
         .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1))))
@@ -1477,13 +1454,12 @@ public class ParDoTest implements Serializable {
           }
         };
 
-    Pipeline p = TestPipeline.create();
     PCollection<Integer> output =
-        p.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84)))
+        pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84)))
             .apply(ParDo.of(fn));
 
     PAssert.that(output).containsInAnyOrder(0, 1, 2);
-    p.run();
+    pipeline.run();
   }
 
   @Test
@@ -1514,9 +1490,8 @@ public class ParDoTest implements Serializable {
           }
         };
 
-    Pipeline p = TestPipeline.create();
     PCollectionTuple output =
-        p.apply(
+        pipeline.apply(
                 Create.of(
                     KV.of("hello", 42),
                     KV.of("hello", 97),
@@ -1534,7 +1509,7 @@ public class ParDoTest implements Serializable {
 
     // There are 1 and 3 from "hello" and just "1" from "goodbye"
     PAssert.that(odds).containsInAnyOrder(1, 3, 1);
-    p.run();
+    pipeline.run();
   }
 
   @Test
@@ -1562,24 +1537,23 @@ public class ParDoTest implements Serializable {
           }
         };
 
-    Pipeline p = TestPipeline.create();
     PCollection<List<Integer>> output =
-        p.apply(
+        pipeline.apply(
                 Create.of(
                     KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello", 12)))
             .apply(ParDo.of(fn));
 
     PAssert.that(output).containsInAnyOrder(Lists.newArrayList(12, 42, 84, 97));
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category({RunnableOnService.class, UsesStatefulParDo.class})
   public void testBagStateSideInput() {
-    Pipeline p = TestPipeline.create();
 
     final PCollectionView<List<Integer>> listView =
-        p.apply("Create list for side input", Create.of(2, 1, 0)).apply(View.<Integer>asList());
+        pipeline
+            .apply("Create list for side input", Create.of(2, 1, 0)).apply(View.<Integer>asList());
 
     final String stateId = "foo";
     DoFn<KV<String, Integer>, List<Integer>> fn =
@@ -1607,7 +1581,7 @@ public class ParDoTest implements Serializable {
         };
 
     PCollection<List<Integer>> output =
-        p.apply(
+        pipeline.apply(
                 "Create main input",
                 Create.of(
                     KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello", 12)))
@@ -1616,7 +1590,7 @@ public class ParDoTest implements Serializable {
     PAssert.that(output).containsInAnyOrder(
         Lists.newArrayList(12, 42, 84, 97),
         Lists.newArrayList(0, 1, 2));
-    p.run();
+    pipeline.run();
   }
 
   /**
@@ -1658,11 +1632,9 @@ public class ParDoTest implements Serializable {
           }
         };
 
-    Pipeline p = TestPipeline.create();
-
-    PCollection<Integer> output = p.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+    PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
     PAssert.that(output).containsInAnyOrder(3, 42);
-    p.run();
+    pipeline.run();
   }
 
   @Test
@@ -1704,7 +1676,6 @@ public class ParDoTest implements Serializable {
 
   @Test
   public void testRejectsWrongWindowType() {
-    Pipeline p = TestPipeline.create();
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(GlobalWindow.class.getSimpleName());
@@ -1712,7 +1683,8 @@ public class ParDoTest implements Serializable {
     thrown.expectMessage("window type");
     thrown.expectMessage("not a supertype");
 
-    p.apply(Create.of(1, 2, 3))
+    pipeline
+        .apply(Create.of(1, 2, 3))
         .apply(
             ParDo.of(
                 new DoFn<Integer, Integer>() {
@@ -1735,9 +1707,8 @@ public class ParDoTest implements Serializable {
   public void testMultipleWindowSubtypesOK() {
     final String timerId = "gobbledegook";
 
-    Pipeline p = TestPipeline.create();
-
-    p.apply(Create.of(1, 2, 3))
+    pipeline
+        .apply(Create.of(1, 2, 3))
         .apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(10))))
         .apply(
             ParDo.of(
@@ -1759,26 +1730,25 @@ public class ParDoTest implements Serializable {
   public void testRejectsSplittableDoFnByDefault() {
     // ParDo with a splittable DoFn must be overridden by the runner.
     // Without an override, applying it directly must fail.
-    Pipeline p = TestPipeline.create();
 
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(p.getRunner().getClass().getName());
+    thrown.expectMessage(pipeline.getRunner().getClass().getName());
     thrown.expectMessage("does not support Splittable DoFn");
 
-    p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new TestSplittableDoFn()));
+    pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(new TestSplittableDoFn()));
   }
 
   @Test
   public void testMultiRejectsSplittableDoFnByDefault() {
     // ParDo with a splittable DoFn must be overridden by the runner.
     // Without an override, applying it directly must fail.
-    Pipeline p = TestPipeline.create();
 
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(p.getRunner().getClass().getName());
+    thrown.expectMessage(pipeline.getRunner().getClass().getName());
     thrown.expectMessage("does not support Splittable DoFn");
 
-    p.apply(Create.of(1, 2, 3))
+    pipeline
+        .apply(Create.of(1, 2, 3))
         .apply(
             ParDo.of(new TestSplittableDoFn())
                 .withOutputTags(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
index 1cbe344..87d7460 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -47,6 +46,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class PartitionTest implements Serializable {
 
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   static class ModFn implements PartitionFn<Integer> {
@@ -63,10 +63,10 @@ public class PartitionTest implements Serializable {
     }
   }
 
+
   @Test
   @Category(RunnableOnService.class)
   public void testEvenOddPartition() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollectionList<Integer> outputs = pipeline
         .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
@@ -81,7 +81,6 @@ public class PartitionTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testModPartition() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollectionList<Integer> outputs = pipeline
         .apply(Create.of(1, 2, 4, 5))
@@ -96,7 +95,6 @@ public class PartitionTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testOutOfBoundsPartitions() {
-    Pipeline pipeline = TestPipeline.create();
 
     pipeline
     .apply(Create.of(-1))
@@ -110,7 +108,6 @@ public class PartitionTest implements Serializable {
 
   @Test
   public void testZeroNumPartitions() {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<Integer> input = pipeline.apply(Create.of(591));
 
@@ -122,7 +119,6 @@ public class PartitionTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testDroppedPartition() {
-    Pipeline pipeline = TestPipeline.create();
 
     // Compute the set of integers either 1 or 2 mod 3, the hard way.
     PCollectionList<Integer> outputs = pipeline

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java
index 6e196b4..cd707da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -33,11 +34,13 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link Regex}. */
 @RunWith(JUnit4.class)
 public class RegexTest implements Serializable {
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(NeedsRunner.class)
   public void testFind() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<String> output =
         p.apply(Create.of("aj", "xj", "yj", "zj")).apply(Regex.find("[xyz]"));
 
@@ -48,8 +51,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testFindGroup() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<String> output =
         p.apply(Create.of("aj", "xj", "yj", "zj")).apply(Regex.find("([xyz])", 1));
 
@@ -60,8 +61,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testFindNone() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<String> output = p.apply(Create.of("a", "b", "c", "d")).apply(Regex.find("[xyz]"));
 
     PAssert.that(output).empty();
@@ -71,7 +70,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testKVFind() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<KV<String, String>> output =
         p.apply(Create.of("a b c")).apply(Regex.findKV("a (b) (c)", 1, 2));
@@ -83,7 +81,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testKVFindNone() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<KV<String, String>> output =
         p.apply(Create.of("x y z")).apply(Regex.findKV("a (b) (c)", 1, 2));
@@ -95,7 +92,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMatches() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("a", "x", "y", "z")).apply(Regex.matches("[xyz]"));
@@ -107,7 +103,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMatchesNone() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("a", "b", "c", "d")).apply(Regex.matches("[xyz]"));
@@ -119,7 +114,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMatchesGroup() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("a", "x xxx", "x yyy", "x zzz")).apply(Regex.matches("x ([xyz]*)", 1));
@@ -131,7 +125,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testKVMatches() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<KV<String, String>> output =
         p.apply(Create.of("a b c")).apply(Regex.matchesKV("a (b) (c)", 1, 2));
@@ -143,7 +136,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testKVMatchesNone() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<KV<String, String>> output =
         p.apply(Create.of("x y z")).apply(Regex.matchesKV("a (b) (c)", 1, 2));
@@ -154,7 +146,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testReplaceAll() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("xj", "yj", "zj")).apply(Regex.replaceAll("[xyz]", "new"));
@@ -166,7 +157,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testReplaceAllMixed() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("abc", "xj", "yj", "zj", "def")).apply(Regex.replaceAll("[xyz]", "new"));
@@ -178,7 +168,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testReplaceFirst() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("xjx", "yjy", "zjz")).apply(Regex.replaceFirst("[xyz]", "new"));
@@ -190,7 +179,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testReplaceFirstMixed() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("abc", "xjx", "yjy", "zjz", "def"))
@@ -203,7 +191,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testSplits() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("The  quick   brown fox jumps over    the lazy dog"))
@@ -217,7 +204,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testSplitsWithEmpty() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("The  quick   brown fox jumps over    the lazy dog"))
@@ -235,7 +221,6 @@ public class RegexTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testSplitsWithoutEmpty() {
-    TestPipeline p = TestPipeline.create();
 
     PCollection<String> output =
         p.apply(Create.of("The  quick   brown fox jumps over    the lazy dog"))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index a0555fa..9cc12d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -41,6 +42,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -102,71 +104,70 @@ public class SampleTest {
     }
   }
 
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testSample() {
-    Pipeline p = TestPipeline.create();
 
-    PCollection<Integer> input = p.apply(Create.of(DATA)
+    PCollection<Integer> input = pipeline.apply(Create.of(DATA)
         .withCoder(BigEndianIntegerCoder.of()));
     PCollection<Iterable<Integer>> output = input.apply(
         Sample.<Integer>fixedSizeGlobally(3));
 
     PAssert.thatSingletonIterable(output)
         .satisfies(new VerifyCorrectSample<>(3, DATA));
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testSampleEmpty() {
-    Pipeline p = TestPipeline.create();
 
-    PCollection<Integer> input = p.apply(Create.of(EMPTY)
+    PCollection<Integer> input = pipeline.apply(Create.of(EMPTY)
         .withCoder(BigEndianIntegerCoder.of()));
     PCollection<Iterable<Integer>> output = input.apply(
         Sample.<Integer>fixedSizeGlobally(3));
 
     PAssert.thatSingletonIterable(output)
         .satisfies(new VerifyCorrectSample<>(0, EMPTY));
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testSampleZero() {
-    Pipeline p = TestPipeline.create();
 
-    PCollection<Integer> input = p.apply(Create.of(DATA)
+    PCollection<Integer> input = pipeline.apply(Create.of(DATA)
         .withCoder(BigEndianIntegerCoder.of()));
     PCollection<Iterable<Integer>> output = input.apply(
         Sample.<Integer>fixedSizeGlobally(0));
 
     PAssert.thatSingletonIterable(output)
         .satisfies(new VerifyCorrectSample<>(0, DATA));
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testSampleInsufficientElements() {
-    Pipeline p = TestPipeline.create();
 
-    PCollection<Integer> input = p.apply(Create.of(DATA)
+    PCollection<Integer> input = pipeline.apply(Create.of(DATA)
         .withCoder(BigEndianIntegerCoder.of()));
     PCollection<Iterable<Integer>> output = input.apply(
         Sample.<Integer>fixedSizeGlobally(10));
 
     PAssert.thatSingletonIterable(output)
         .satisfies(new VerifyCorrectSample<>(5, DATA));
-    p.run();
+    pipeline.run();
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testSampleNegative() {
-    Pipeline p = TestPipeline.create();
+    pipeline.enableAbandonedNodeEnforcement(false);
 
-    PCollection<Integer> input = p.apply(Create.of(DATA)
+    PCollection<Integer> input = pipeline.apply(Create.of(DATA)
         .withCoder(BigEndianIntegerCoder.of()));
     input.apply(Sample.<Integer>fixedSizeGlobally(-1));
   }
@@ -174,9 +175,8 @@ public class SampleTest {
   @Test
   @Category(RunnableOnService.class)
   public void testSampleMultiplicity() {
-    Pipeline p = TestPipeline.create();
 
-    PCollection<Integer> input = p.apply(Create.of(REPEATED_DATA)
+    PCollection<Integer> input = pipeline.apply(Create.of(REPEATED_DATA)
         .withCoder(BigEndianIntegerCoder.of()));
     // At least one value must be selected with multiplicity.
     PCollection<Iterable<Integer>> output = input.apply(
@@ -184,7 +184,7 @@ public class SampleTest {
 
     PAssert.thatSingletonIterable(output)
         .satisfies(new VerifyCorrectSample<>(6, REPEATED_DATA));
-    p.run();
+    pipeline.run();
   }
 
   private static class VerifyAnySample implements SerializableFunction<Iterable<String>, Void> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 022c2e5..e3b58b7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -29,7 +29,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -54,6 +53,7 @@ import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.MutableDateTime;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -151,10 +151,13 @@ public class SplittableDoFnTest {
     }
   }
 
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Test
   @Category({RunnableOnService.class, UsesSplittableParDo.class})
   public void testPairWithIndexBasic() {
-    Pipeline p = TestPipeline.create();
+
     PCollection<KV<String, Integer>> res =
         p.apply(Create.of("a", "bb", "ccccc"))
             .apply(ParDo.of(new PairStringWithIndexToLength()))
@@ -180,7 +183,6 @@ public class SplittableDoFnTest {
   public void testPairWithIndexWindowedTimestamped() {
     // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
     // of elements in the input collection.
-    Pipeline p = TestPipeline.create();
 
     MutableDateTime mutableNow = Instant.now().toMutableDateTime();
     mutableNow.setMillisOfSecond(0);
@@ -277,7 +279,6 @@ public class SplittableDoFnTest {
   @Test
   @Category({RunnableOnService.class, UsesSplittableParDo.class})
   public void testOutputAfterCheckpoint() throws Exception {
-    Pipeline p = TestPipeline.create();
     PCollection<Integer> outputs = p.apply(Create.of("foo"))
         .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock()));
     PAssert.thatSingleton(outputs.apply(Count.<Integer>globally()))
@@ -317,7 +318,6 @@ public class SplittableDoFnTest {
   @Test
   @Category({RunnableOnService.class, UsesSplittableParDo.class})
   public void testSideInputsAndOutputs() throws Exception {
-    Pipeline p = TestPipeline.create();
 
     PCollectionView<String> sideInput =
         p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
@@ -344,7 +344,6 @@ public class SplittableDoFnTest {
   @Test
   @Category({RunnableOnService.class, UsesSplittableParDo.class})
   public void testLateData() throws Exception {
-    Pipeline p = TestPipeline.create();
 
     Instant base = Instant.now();
 
@@ -439,7 +438,6 @@ public class SplittableDoFnTest {
   @Test
   @Category({RunnableOnService.class, UsesSplittableParDo.class})
   public void testLifecycleMethods() throws Exception {
-    Pipeline p = TestPipeline.create();
 
     PCollection<String> res =
         p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index b624252..d011197 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -53,6 +53,9 @@ import org.junit.runners.JUnit4;
 public class TopTest {
 
   @Rule
+  public final TestPipeline p = TestPipeline.create();
+
+  @Rule
   public ExpectedException expectedEx = ExpectedException.none();
 
   @SuppressWarnings("unchecked")
@@ -93,7 +96,6 @@ public class TopTest {
   @Category(NeedsRunner.class)
   @SuppressWarnings("unchecked")
   public void testTop() {
-    Pipeline p = TestPipeline.create();
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(COLLECTION))
                  .withCoder(StringUtf8Coder.of()));
@@ -125,7 +127,6 @@ public class TopTest {
   @Category(NeedsRunner.class)
   @SuppressWarnings("unchecked")
   public void testTopEmpty() {
-    Pipeline p = TestPipeline.create();
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(EMPTY_COLLECTION))
                  .withCoder(StringUtf8Coder.of()));
@@ -151,7 +152,8 @@ public class TopTest {
 
   @Test
   public void testTopEmptyWithIncompatibleWindows() {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
+
     Bound<String> windowingFn = Window.<String>into(FixedWindows.of(Duration.standardDays(10L)));
     PCollection<String> input =
         p.apply(Create.timestamped(Collections.<String>emptyList(), Collections.<Long>emptyList()))
@@ -170,7 +172,6 @@ public class TopTest {
   @Category(NeedsRunner.class)
   @SuppressWarnings("unchecked")
   public void testTopZero() {
-    Pipeline p = TestPipeline.create();
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(COLLECTION))
                  .withCoder(StringUtf8Coder.of()));
@@ -202,7 +203,8 @@ public class TopTest {
   // This is a purely compile-time test.  If the code compiles, then it worked.
   @Test
   public void testPerKeySerializabilityRequirement() {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
+
     p.apply("CreateCollection", Create.of(Arrays.asList(COLLECTION))
         .withCoder(StringUtf8Coder.of()));
 
@@ -218,7 +220,8 @@ public class TopTest {
 
   @Test
   public void testCountConstraint() {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
+
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(COLLECTION))
             .withCoder(StringUtf8Coder.of()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
index 0bf2e2e..5e27552 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -29,6 +28,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -52,10 +52,12 @@ public class ValuesTest {
   static final KV<String, Integer>[] EMPTY_TABLE = new KV[] {
   };
 
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testValues() {
-    Pipeline p = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(TABLE)).withCoder(
@@ -72,7 +74,6 @@ public class ValuesTest {
   @Test
   @Category(RunnableOnService.class)
   public void testValuesEmpty() {
-    Pipeline p = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder(


Mime
View raw message