beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/4] beam git commit: Port Java modules from RunnableOnService to ValidatesRunner
Date Mon, 27 Mar 2017 18:15:25 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 73cedfd..3443847 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -47,8 +47,8 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -86,7 +86,7 @@ public class GroupByKeyTest {
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGroupByKey() {
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
         KV.of("k1", 3),
@@ -126,7 +126,7 @@ public class GroupByKeyTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGroupByKeyAndWindows() {
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
         KV.of("k1", 3),  // window [0, 5)
@@ -168,7 +168,7 @@ public class GroupByKeyTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGroupByKeyEmpty() {
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -318,7 +318,7 @@ public class GroupByKeyTest {
    * the two values.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOutputTimeFnEarliest() {
 
     p.apply(
@@ -339,7 +339,7 @@ public class GroupByKeyTest {
    * with the windowing function customized to use the latest value.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOutputTimeFnLatest() {
     p.apply(
         Create.timestamped(
@@ -389,7 +389,7 @@ public class GroupByKeyTest {
    * and not the value itself.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testGroupByKeyWithBadEqualsHashCode() throws Exception {
     final int numValues = 10;
     final int numKeys = 5;

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
index 2a19802..dafb953 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java
@@ -22,8 +22,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -54,7 +54,7 @@ public class KeysTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testKeys() {
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(TABLE)).withCoder(
@@ -68,7 +68,7 @@ public class KeysTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testKeysEmpty() {
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
index 859312f..762451f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java
@@ -23,8 +23,8 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -56,7 +56,7 @@ public class KvSwapTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testKvSwap() {
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(TABLE)).withCoder(
@@ -77,7 +77,7 @@ public class KvSwapTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testKvSwapEmpty() {
     PCollection<KV<String, Integer>> input =
         p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 47d0b87..82e856e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -26,8 +26,8 @@ import java.io.Serializable;
 import java.util.Set;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
@@ -250,7 +250,7 @@ public class MapElementsTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPrimitiveDisplayData() {
     SimpleFunction<Integer, ?> mapFn = new SimpleFunction<Integer, Integer>() {
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index 2c3a735..fda8947 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -28,8 +28,8 @@ import static org.junit.Assert.fail;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -49,7 +49,7 @@ public class ParDoLifecycleTest implements Serializable {
   public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOldFnCallSequence() {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
@@ -60,7 +60,7 @@ public class ParDoLifecycleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOldFnCallSequenceMulti() {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
@@ -128,7 +128,7 @@ public class ParDoLifecycleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFnCallSequence() {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
@@ -139,7 +139,7 @@ public class ParDoLifecycleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFnCallSequenceMulti() {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index e0fa02c..336f4c0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -62,7 +62,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesMapState;
@@ -70,6 +69,7 @@ import org.apache.beam.sdk.testing.UsesSetState;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.UsesTimersInParDo;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -320,7 +320,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDo() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -336,7 +336,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDo2() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -352,7 +352,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoEmpty() {
 
     List<Integer> inputs = Arrays.asList();
@@ -368,7 +368,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoEmptyOutputs() {
 
     List<Integer> inputs = Arrays.asList();
@@ -383,7 +383,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithSideOutputs() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -425,7 +425,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoEmptyWithSideOutputs() {
     TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
     TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
@@ -463,7 +463,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithEmptySideOutputs() {
     TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
     TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){};
@@ -487,7 +487,7 @@ public class ParDoTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithOnlySideOutputs() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -569,7 +569,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithSideInputs() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -600,7 +600,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoWithSideInputsIsCumulative() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -633,7 +633,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultiOutputParDoWithSideInputs() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -670,7 +670,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultiOutputParDoWithSideInputsIsCumulative() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -740,7 +740,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSideInputsWithMultipleWindows() {
     // Tests that the runner can safely run a DoFn that uses side inputs
     // on an input where the element is in multiple windows. The complication is
@@ -908,7 +908,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoInCustomTransform() {
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
@@ -1283,7 +1283,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoShiftTimestamp() {
 
     PCollection<Integer> input =
@@ -1346,7 +1346,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testParDoShiftTimestampUnlimited() {
     PCollection<Long> outputs =
         pipeline
@@ -1432,7 +1432,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowingInStartAndFinishBundle() {
 
     PCollection<String> output =
@@ -1536,7 +1536,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateSimple() {
     final String stateId = "foo";
 
@@ -1565,7 +1565,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateDedup() {
     final String stateId = "foo";
 
@@ -1613,7 +1613,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateFixedWindows() {
     final String stateId = "foo";
 
@@ -1661,7 +1661,7 @@ public class ParDoTest implements Serializable {
    * which may (or may not) be executed in similar contexts after runner optimizations.
    */
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateSameId() {
     final String stateId = "foo";
 
@@ -1711,7 +1711,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateSideOutput() {
     final String stateId = "foo";
 
@@ -1761,7 +1761,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testBagState() {
     final String stateId = "foo";
 
@@ -1796,7 +1796,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class, UsesSetState.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSetState.class})
   public void testSetState() {
     final String stateId = "foo";
     final String countStateId = "count";
@@ -1838,7 +1838,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class, UsesMapState.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
   public void testMapState() {
     final String stateId = "foo";
     final String countStateId = "count";
@@ -1883,7 +1883,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testCombiningState() {
     final String stateId = "foo";
 
@@ -1922,7 +1922,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testBagStateSideInput() {
 
     final PCollectionView<List<Integer>> listView =
@@ -1984,7 +1984,7 @@ public class ParDoTest implements Serializable {
    * and is only supported by the direct runner.
    */
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testEventTimeTimerBounded() throws Exception {
     final String timerId = "foo";
 
@@ -2012,7 +2012,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testTimerReceivedInOriginalWindow() throws Exception {
     final String timerId = "foo";
 
@@ -2060,7 +2060,7 @@ public class ParDoTest implements Serializable {
    * supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}.
    */
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testEventTimeTimerAbsolute() throws Exception {
     final String timerId = "foo";
 
@@ -2093,7 +2093,7 @@ public class ParDoTest implements Serializable {
    * implementations that may GC in ways not simply governed by the watermark.
    */
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testEventTimeTimerMultipleKeys() throws Exception {
     final String timerId = "foo";
     final String stateId = "sizzle";
@@ -2157,7 +2157,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testAbsoluteProcessingTimeTimerRejected() throws Exception {
     final String timerId = "foo";
 
@@ -2186,7 +2186,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testOutOfBoundsEventTimeTimer() throws Exception {
     final String timerId = "foo";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
index 87d7460..3d81b39 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
@@ -27,8 +27,8 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Partition.PartitionFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
@@ -65,7 +65,7 @@ public class PartitionTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEvenOddPartition() {
 
     PCollectionList<Integer> outputs = pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 8e426c6..80f361f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -37,8 +37,8 @@ import org.apache.beam.sdk.TestUtils;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -163,7 +163,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testPickAny() {
       runPickAnyTest(lines, limit);
     }
@@ -235,7 +235,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSample() {
 
       PCollection<Integer> input =
@@ -249,7 +249,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSampleEmpty() {
 
       PCollection<Integer> input = pipeline.apply(Create.empty(BigEndianIntegerCoder.of()));
@@ -262,7 +262,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSampleZero() {
 
       PCollection<Integer> input = pipeline.apply(Create.of(ImmutableList.copyOf(DATA))
@@ -276,7 +276,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSampleInsufficientElements() {
 
       PCollection<Integer> input =
@@ -301,7 +301,7 @@ public class SampleTest {
     }
 
     @Test
-    @Category(RunnableOnService.class)
+    @Category(ValidatesRunner.class)
     public void testSampleMultiplicity() {
 
       PCollection<Integer> input =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index fefccc4..acd5584 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -31,10 +31,10 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
@@ -105,7 +105,7 @@ public class SplittableDoFnTest {
   public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testPairWithIndexBasic() {
 
     PCollection<KV<String, Integer>> res =
@@ -129,7 +129,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testPairWithIndexWindowedTimestamped() {
     // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
     // of elements in the input collection.
@@ -227,7 +227,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testOutputAfterCheckpoint() throws Exception {
     PCollection<Integer> outputs = p.apply(Create.of("foo"))
         .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock()));
@@ -266,7 +266,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testSideInputsAndOutputs() throws Exception {
 
     PCollectionView<String> sideInput =
@@ -292,7 +292,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testLateData() throws Exception {
 
     Instant base = Instant.now();
@@ -387,7 +387,7 @@ public class SplittableDoFnTest {
   }
 
   @Test
-  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testLifecycleMethods() throws Exception {
 
     PCollection<String> res =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
index ab446a4..5b9c2c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
@@ -20,12 +20,11 @@ package org.apache.beam.sdk.transforms;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -43,7 +42,7 @@ public class ToStringTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringOf() {
     Integer[] ints = {1, 2, 3, 4, 5};
     String[] strings = {"1", "2", "3", "4", "5"};
@@ -54,7 +53,7 @@ public class ToStringTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringKV() {
     ArrayList<KV<String, Integer>> kvs = new ArrayList<>();
     kvs.add(KV.of("one", 1));
@@ -71,7 +70,7 @@ public class ToStringTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringKVWithDelimiter() {
     ArrayList<KV<String, Integer>> kvs = new ArrayList<>();
     kvs.add(KV.of("one", 1));
@@ -88,7 +87,7 @@ public class ToStringTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringIterable() {
     ArrayList<Iterable<String>> iterables = new ArrayList<>();
     iterables.add(Arrays.asList(new String[]{"one", "two", "three"}));
@@ -106,7 +105,7 @@ public class ToStringTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testToStringIterableWithDelimiter() {
     ArrayList<Iterable<String>> iterables = new ArrayList<>();
     iterables.add(Arrays.asList(new String[]{"one", "two", "three"}));

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
index 5e27552..e290771 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -56,7 +56,7 @@ public class ValuesTest {
   public final TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testValues() {
 
     PCollection<KV<String, Integer>> input =
@@ -72,7 +72,7 @@ public class ValuesTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testValuesEmpty() {
 
     PCollection<KV<String, Integer>> input =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index ee8bf40..740d808 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -49,8 +49,8 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -90,7 +90,7 @@ public class ViewTest implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSingletonSideInput() {
 
     final PCollectionView<Integer> view =
@@ -112,7 +112,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSingletonSideInput() {
 
     final PCollectionView<Integer> view =
@@ -191,7 +191,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testListSideInput() {
 
     final PCollectionView<List<Integer>> view =
@@ -217,7 +217,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedListSideInput() {
 
     final PCollectionView<List<Integer>> view =
@@ -257,7 +257,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyListSideInput() throws Exception {
 
     final PCollectionView<List<Integer>> view =
@@ -283,7 +283,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testListSideInputIsImmutable() {
 
     final PCollectionView<List<Integer>> view =
@@ -328,7 +328,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIterableSideInput() {
 
     final PCollectionView<Iterable<Integer>> view =
@@ -353,7 +353,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedIterableSideInput() {
 
     final PCollectionView<Iterable<Integer>> view =
@@ -392,7 +392,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyIterableSideInput() throws Exception {
 
     final PCollectionView<Iterable<Integer>> view =
@@ -417,7 +417,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIterableSideInputIsImmutable() {
 
     final PCollectionView<Iterable<Integer>> view =
@@ -448,7 +448,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultimapSideInput() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -475,7 +475,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultimapAsEntrySetSideInput() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -526,7 +526,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultimapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -555,7 +555,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMultimapSideInput() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -593,7 +593,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMultimapAsEntrySetSideInput() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -635,7 +635,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -674,7 +674,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyMultimapSideInput() throws Exception {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -702,7 +702,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -732,7 +732,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMultimapSideInputIsImmutable() {
 
     final PCollectionView<Map<String, Iterable<Integer>>> view =
@@ -779,7 +779,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMapSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -805,7 +805,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMapAsEntrySetSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -834,7 +834,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -862,7 +862,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMapSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -899,7 +899,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMapAsEntrySetSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -940,7 +940,7 @@ public class ViewTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedMapSideInputWithNonDeterministicKeyCoder() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -979,7 +979,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyMapSideInput() throws Exception {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1008,7 +1008,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1071,7 +1071,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMapSideInputIsImmutable() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1117,7 +1117,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCombinedMapSideInput() {
 
     final PCollectionView<Map<String, Integer>> view =
@@ -1143,7 +1143,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSideInputFixedToFixed() {
 
     final PCollectionView<Integer> view =
@@ -1175,7 +1175,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSideInputFixedToGlobal() {
 
     final PCollectionView<Integer> view =
@@ -1207,7 +1207,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowedSideInputFixedToFixedWithDefault() {
 
     final PCollectionView<Integer> view =
@@ -1237,7 +1237,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSideInputWithNullDefault() {
 
     final PCollectionView<Void> view =
@@ -1266,7 +1266,7 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testSideInputWithNestedIterables() {
     final PCollectionView<Iterable<Integer>> view1 =
         pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of()))

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index 67a2658..48e07ad 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -23,8 +23,8 @@ import java.io.Serializable;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -49,7 +49,7 @@ public class WithTimestampsTest implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withTimestampsShouldApplyTimestamps() {
 
     SerializableFunction<String, Instant> timestampFn =
@@ -120,7 +120,7 @@ public class WithTimestampsTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withTimestampsBackwardsInTimeAndWithAllowedTimestampSkewShouldSucceed() {
 
     SerializableFunction<String, Instant> timestampFn =
@@ -195,7 +195,7 @@ public class WithTimestampsTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withTimestampsWithNullFnShouldThrowOnConstruction() {
 
     SerializableFunction<String, Instant> timestampFn = null;

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 0e5c177..4e61f4e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -123,7 +123,7 @@ public class CoGroupByKeyTest implements Serializable {
   public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCoGroupByKeyGetOnly() {
     final TupleTag<String> tag1 = new TupleTag<>();
     final TupleTag<String> tag2 = new TupleTag<>();
@@ -260,7 +260,7 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCoGroupByKey() {
     final TupleTag<String> namesTag = new TupleTag<>();
     final TupleTag<String> addressesTag = new TupleTag<>();
@@ -451,7 +451,7 @@ public class CoGroupByKeyTest implements Serializable {
    */
   @SuppressWarnings("unchecked")
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCoGroupByKeyHandleResults() {
     TupleTag<String> namesTag = new TupleTag<>();
     TupleTag<String> addressesTag = new TupleTag<>();
@@ -480,7 +480,7 @@ public class CoGroupByKeyTest implements Serializable {
    */
   @SuppressWarnings("unchecked")
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testCoGroupByKeyWithWindowing() {
     TupleTag<String> clicksTag = new TupleTag<>();
     TupleTag<String> purchasesTag = new TupleTag<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 327b7a8..2bc8d86 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -42,8 +42,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -312,7 +312,7 @@ public class WindowTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testNoWindowFnDoesNotReassignWindows() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
@@ -364,7 +364,7 @@ public class WindowTest implements Serializable {
    * with the windowing function default, the end of the window.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOutputTimeFnDefault() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
@@ -398,7 +398,7 @@ public class WindowTest implements Serializable {
    * with the windowing function customized to use the end of the window.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testOutputTimeFnEndOfWindow() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
@@ -449,7 +449,7 @@ public class WindowTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPrimitiveDisplayData() {
     FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5));
     AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow();

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index b0976e4..a3f5352 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -89,7 +89,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testPartitioningWindowing() {
     PCollection<String> input =
         p.apply(
@@ -114,7 +114,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testNonPartitioningWindowing() {
     PCollection<String> input =
         p.apply(
@@ -139,7 +139,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testMergingWindowing() {
     PCollection<String> input =
         p.apply(
@@ -160,7 +160,7 @@ public class WindowingTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWindowPreservation() {
     PCollection<String> input1 = p.apply("Create12",
         Create.timestamped(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
index b78de8e..2942efd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
@@ -23,8 +23,8 @@ import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -48,7 +48,7 @@ public class ReifyTimestampsTest implements Serializable {
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void inValuesSucceeds() {
     PCollection<KV<String, Integer>> timestamped =
         pipeline
@@ -76,7 +76,7 @@ public class ReifyTimestampsTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void extractFromValuesSucceeds() {
     PCollection<KV<String, TimestampedValue<Integer>>> preified =
         pipeline.apply(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
index 81a6d82..eae465c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -79,7 +79,7 @@ public class ReshuffleTest implements Serializable {
   public final transient TestPipeline pipeline = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testJustReshuffle() {
 
     PCollection<KV<String, Integer>> input = pipeline
@@ -103,7 +103,7 @@ public class ReshuffleTest implements Serializable {
    * {@link WindowingStrategy}.
    */
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshufflePreservesTimestamps() {
     PCollection<KV<String, TimestampedValue<String>>> input =
         pipeline
@@ -155,7 +155,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterSessionsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
@@ -178,7 +178,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterFixedWindowsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
@@ -201,7 +201,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterSlidingWindowsAndGroupByKey() {
 
     PCollection<KV<String, Iterable<Integer>>> input = pipeline
@@ -224,7 +224,7 @@ public class ReshuffleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterFixedWindows() {
 
     PCollection<KV<String, Integer>> input = pipeline
@@ -247,7 +247,7 @@ public class ReshuffleTest implements Serializable {
 
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReshuffleAfterSlidingWindows() {
 
     PCollection<KV<String, Integer>> input = pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 7d767cf..010d726 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -32,8 +32,8 @@ import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -73,7 +73,7 @@ public final class PCollectionTupleTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testComposePCollectionTuple() {
     pipeline.enableAbandonedNodeEnforcement(true);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index ba7477d..7c9d1d9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -22,8 +22,8 @@ import static org.apache.beam.sdk.TestUtils.LINES;
 import java.io.File;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.junit.Ignore;
@@ -79,7 +79,7 @@ public class PDoneTest {
   // transforms that contain no nested transforms.
   @Ignore
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testEmptyTransform() {
     p.begin().apply(new EmptyTransform());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 60054c6..a14f480 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -32,9 +32,9 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -135,7 +135,7 @@ public class ElasticsearchIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testRead() throws Exception {
     ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
 
@@ -152,7 +152,7 @@ public class ElasticsearchIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReadWithQuery() throws Exception {
     ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
 
@@ -179,7 +179,7 @@ public class ElasticsearchIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWrite() throws Exception {
     List<String> data =
         ElasticSearchIOTestUtils.createDocuments(

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 4dcc8d5..35bb9c3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -127,10 +127,10 @@ import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -1295,8 +1295,8 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     BigQueryIO.Read read = BigQueryIO.read()
@@ -1312,8 +1312,8 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     BigQueryIO.Read read = BigQueryIO.read()
@@ -1339,15 +1339,15 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testBatchWritePrimitiveDisplayData() throws IOException, InterruptedException {
     testWritePrimitiveDisplayData(/* streaming: */ false);
   }
 
   @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  @Category(ValidatesRunner.class)
+  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
   public void testStreamingWritePrimitiveDisplayData() throws IOException, InterruptedException {
     testWritePrimitiveDisplayData(/* streaming: */ true);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 879e30e..af27926 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -87,8 +87,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -429,7 +429,7 @@ public class DatastoreV1Test {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWritePrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     PTransform<PCollection<Entity>, ?> write =
@@ -443,7 +443,7 @@ public class DatastoreV1Test {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDeleteEntityPrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     PTransform<PCollection<Entity>, ?> write =
@@ -457,7 +457,7 @@ public class DatastoreV1Test {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testDeleteKeyPrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     PTransform<PCollection<Key>, ?> write =

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 28ca5f7..da89aa2 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -24,13 +24,12 @@ import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
 import org.apache.beam.sdk.io.mqtt.MqttIO.Read;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.fusesource.hawtbuf.Buffer;
@@ -79,7 +78,7 @@ public class MqttIOTest {
   }
 
   @Test(timeout = 60 * 1000)
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testReadNoClientId() throws Exception {
     final String topicName = "READ_TOPIC_NO_CLIENT_ID";
     Read mqttReader = MqttIO.read()
@@ -139,7 +138,7 @@ public class MqttIOTest {
   }
 
   @Test(timeout = 60 * 1000)
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testRead() throws Exception {
     PCollection<byte[]> output = pipeline.apply(
         MqttIO.read()
@@ -199,7 +198,7 @@ public class MqttIOTest {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testWrite() throws Exception {
     MQTT client = new MQTT();
     client.setHost("tcp://localhost:" + port);

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
index f91371e..a3481e1 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
@@ -20,8 +20,8 @@ package org.apache.beam.sdk.transforms;
 import java.io.Serializable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,7 +44,7 @@ public class FilterJava8Test implements Serializable {
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testIdentityFilterByPredicate() {
 
     PCollection<Integer> output = pipeline
@@ -67,7 +67,7 @@ public class FilterJava8Test implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterByPredicate() {
 
     PCollection<Integer> output = pipeline
@@ -95,7 +95,7 @@ public class FilterJava8Test implements Serializable {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void testFilterByMethodReference() {
 
     PCollection<Integer> output = pipeline

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index 6ba41fa..4f0361e 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -20,8 +20,8 @@ package org.apache.beam.sdk.transforms;
 import static org.hamcrest.Matchers.containsString;
 
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -45,7 +45,7 @@ public class WithKeysJava8Test {
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withLambdaAndTypeDescriptorShouldSucceed() {
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a58a7412/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
index a0c6370..ee23d95 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.transforms;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
@@ -40,7 +40,7 @@ public class WithTimestampsJava8Test implements Serializable {
   public final transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Category(RunnableOnService.class)
+  @Category(ValidatesRunner.class)
   public void withTimestampsLambdaShouldApplyTimestamps() {
 
     final String yearTwoThousand = "946684800000";


Mime
View raw message