beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-983] Fix a bunch of precommit errors from #1332
Date Wed, 16 Nov 2016 00:01:43 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 201110222 -> dbbd5e448


[BEAM-983] Fix a bunch of precommit errors from #1332

Renames TestPipelineOptions to SparkTestPipelineOptions

To avoid confusion with sdk.testing.TestPipelineOptions.
Also, a couple of other minor fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd740ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd740ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd740ee1

Branch: refs/heads/master
Commit: dd740ee1b20ab6921db3620ac28499dc66511482
Parents: 2011102
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Tue Nov 15 14:25:51 2016 -0800
Committer: Sela <ansela@paypal.com>
Committed: Wed Nov 16 01:49:59 2016 +0200

----------------------------------------------------------------------
 .../runners/spark/ProvidedSparkContextTest.java |  2 -
 .../metrics/sink/NamedAggregatorsTest.java      |  4 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  5 +--
 .../beam/runners/spark/io/NumShardsTest.java    |  5 +--
 .../io/hadoop/HadoopFileFormatPipelineTest.java |  5 +--
 .../spark/translation/SideEffectsTest.java      | 34 ++++++---------
 .../streaming/EmptyStreamAssertionTest.java     |  5 ++-
 .../streaming/FlattenStreamingTest.java         |  5 ++-
 .../streaming/KafkaStreamingTest.java           |  5 ++-
 .../ResumeFromCheckpointStreamingTest.java      |  5 ++-
 .../streaming/SimpleStreamingWordCountTest.java |  5 ++-
 .../utils/SparkTestPipelineOptions.java         | 42 +++++++++++++++++++
 .../SparkTestPipelineOptionsForStreaming.java   | 44 ++++++++++++++++++++
 .../streaming/utils/TestPipelineOptions.java    | 25 -----------
 .../utils/TestPipelineOptionsForStreaming.java  | 44 --------------------
 15 files changed, 121 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index bc337c7..fe73aba 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.Rule;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index c220f2b..c16574c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
@@ -52,7 +52,7 @@ public class NamedAggregatorsTest {
   public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
 
   @Rule
-  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+  public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions();
 
   private Pipeline createSparkPipeline() {
     SparkPipelineOptions options = pipelineOptions.getOptions();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 396a30d..03f96d5 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -33,8 +33,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.values.PCollection;
@@ -55,7 +54,7 @@ public class AvroPipelineTest {
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
   @Rule
-  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+  public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions();
 
   @Before
   public void setUp() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 922046c..4e5435f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -30,9 +30,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
@@ -60,7 +59,7 @@ public class NumShardsTest {
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
   @Rule
-  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+  public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions();
 
   @Before
   public void setUp() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 628d8b6..b68e8d4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -22,9 +22,8 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.WritableCoder;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.values.KV;
@@ -53,7 +52,7 @@ public class HadoopFileFormatPipelineTest {
   private File outputFile;
 
   @Rule
-  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+  public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions();
 
   @Rule
   public final TemporaryFolder tmpDir = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 7d39d89..3b79d03 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -18,52 +18,42 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.hamcrest.core.Is.isA;
 
 import java.io.Serializable;
-import java.net.URI;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringDelegateCoder;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /**
  * Side effects test.
  */
 public class SideEffectsTest implements Serializable {
-
-  static class UserException extends RuntimeException {
+  private static class UserException extends RuntimeException {
   }
 
   @Rule
-  public transient final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+  public final transient SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions();
+  @Rule
+  public final transient ExpectedException expectedException = ExpectedException.none();
 
   @Test
   public void test() throws Exception {
     Pipeline p = Pipeline.create(pipelineOptions.getOptions());
 
-    p.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
-    p.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() {
-      @Override
+    p.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() {
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         throw new UserException();
       }
     }));
 
-    try {
-      p.run();
-      fail("Run should thrown an exception");
-    } catch (RuntimeException e) {
-      assertTrue(e.getCause() instanceof UserException);
-      assertNotNull(e.getCause());
-    }
+    expectedException.expectCause(isA(UserException.class));
+    p.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index 2a38e30..ec75eb7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -51,7 +51,8 @@ public class EmptyStreamAssertionTest implements Serializable {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
+  public SparkTestPipelineOptionsForStreaming commonOptions =
+      new SparkTestPipelineOptionsForStreaming();
 
   @Test
   public void testAssertion() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index bd544e9..f69bd7f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Create;
@@ -57,7 +57,8 @@ public class FlattenStreamingTest {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
+  public SparkTestPipelineOptionsForStreaming commonOptions =
+      new SparkTestPipelineOptionsForStreaming();
 
   @Test
   public void testFlattenUnbounded() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 5841331..6b2486b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -71,7 +71,8 @@ public class KafkaStreamingTest {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
+  public SparkTestPipelineOptionsForStreaming commonOptions =
+      new SparkTestPipelineOptionsForStreaming();
 
   @Test
   public void testEarliest2Topics() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index e345831..af93d84 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -80,7 +80,8 @@ public class ResumeFromCheckpointStreamingTest {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
+  public SparkTestPipelineOptionsForStreaming commonOptions =
+      new SparkTestPipelineOptionsForStreaming();
 
   @BeforeClass
   public static void init() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index bdfc24f..4c503c4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -25,7 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -47,7 +47,8 @@ public class SimpleStreamingWordCountTest implements Serializable {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestPipelineOptionsForStreaming pipelineOptions = new TestPipelineOptionsForStreaming();
+  public SparkTestPipelineOptionsForStreaming pipelineOptions =
+      new SparkTestPipelineOptionsForStreaming();
 
   private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob
hi"};
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptions.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptions.java
new file mode 100644
index 0000000..2da9888
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming.utils;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule to create a common {@link SparkPipelineOptions} test options for spark-runner.
+ */
+public class SparkTestPipelineOptions extends ExternalResource {
+
+  protected final SparkPipelineOptions options =
+      PipelineOptionsFactory.as(SparkPipelineOptions.class);
+
+  @Override
+  protected void before() throws Throwable {
+    options.setRunner(SparkRunner.class);
+    options.setEnableSparkMetricSinks(false);
+  }
+
+  public SparkPipelineOptions getOptions() {
+    return options;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
new file mode 100644
index 0000000..1c0b68a
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming.utils;
+
+import java.io.IOException;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.junit.rules.TemporaryFolder;
+
+
+/**
+ * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines.
+ */
+public class SparkTestPipelineOptionsForStreaming extends SparkTestPipelineOptions {
+
+  @Override
+  protected void before() throws Throwable {
+    super.before();
+    options.setTimeout(1000L);
+    options.setStreaming(true);
+  }
+
+  public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
+      throws IOException {
+    // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
+    options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
+    return options;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
deleted file mode 100644
index ccfb29e..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.beam.runners.spark.translation.streaming.utils;
-
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.rules.ExternalResource;
-
-/**
- * A rule to create a common {@link SparkPipelineOptions} test options for spark-runner.
- */
-public class TestPipelineOptions extends ExternalResource {
-
-  protected final SparkPipelineOptions options =
-      PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
-  @Override
-  protected void before() throws Throwable {
-    options.setRunner(SparkRunner.class);
-    options.setEnableSparkMetricSinks(false);
-  }
-
-  public SparkPipelineOptions getOptions() {
-    return options;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
deleted file mode 100644
index 3d178ae..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation.streaming.utils;
-
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.junit.rules.TemporaryFolder;
-import java.io.IOException;
-
-
-/**
- * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines.
- */
-public class TestPipelineOptionsForStreaming extends TestPipelineOptions {
-
-  @Override
-  protected void before() throws Throwable {
-    super.before();
-    options.setTimeout(1000L);
-    options.setStreaming(true);
-  }
-
-  public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
-      throws IOException {
-    // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
-    options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
-    return options;
-  }
-}


Mime
View raw message