Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C29B7200BD0 for ; Wed, 16 Nov 2016 01:03:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C11B6160B15; Wed, 16 Nov 2016 00:03:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 750A1160B03 for ; Wed, 16 Nov 2016 01:03:06 +0100 (CET) Received: (qmail 42849 invoked by uid 500); 16 Nov 2016 00:03:05 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 42840 invoked by uid 99); 16 Nov 2016 00:03:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Nov 2016 00:03:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 373021A015D for ; Wed, 16 Nov 2016 00:03:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 0HM_cm36MdLi for ; Wed, 16 Nov 2016 00:03:01 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id DCD435FBCD for ; Wed, 16 Nov 2016 00:03:00 +0000 (UTC) Received: (qmail 39825 invoked by uid 99); 16 Nov 2016 00:01:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Nov 2016 00:01:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 71949E0209; Wed, 16 Nov 2016 00:01:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amitsela@apache.org To: commits@beam.incubator.apache.org Date: Wed, 16 Nov 2016 00:01:43 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: [BEAM-983] Fix a bunch of precommit errors from #1332 archived-at: Wed, 16 Nov 2016 00:03:07 -0000 Repository: incubator-beam Updated Branches: refs/heads/master 201110222 -> dbbd5e448 [BEAM-983] Fix a bunch of precommit errors from #1332 Renames TestPipelineOptions to SparkTestPipelineOptions To avoid confusion with sdk.testing.TestPipelineOptions. Also, a couple of other minor fixes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd740ee1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd740ee1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd740ee1 Branch: refs/heads/master Commit: dd740ee1b20ab6921db3620ac28499dc66511482 Parents: 2011102 Author: Eugene Kirpichov Authored: Tue Nov 15 14:25:51 2016 -0800 Committer: Sela Committed: Wed Nov 16 01:49:59 2016 +0200 ---------------------------------------------------------------------- .../runners/spark/ProvidedSparkContextTest.java | 2 - .../metrics/sink/NamedAggregatorsTest.java | 4 +- .../beam/runners/spark/io/AvroPipelineTest.java | 5 +-- .../beam/runners/spark/io/NumShardsTest.java | 5 +-- .../io/hadoop/HadoopFileFormatPipelineTest.java | 5 +-- .../spark/translation/SideEffectsTest.java | 34 ++++++--------- .../streaming/EmptyStreamAssertionTest.java | 5 ++- .../streaming/FlattenStreamingTest.java | 5 ++- .../streaming/KafkaStreamingTest.java | 5 ++- .../ResumeFromCheckpointStreamingTest.java | 5 ++- .../streaming/SimpleStreamingWordCountTest.java | 5 ++- .../utils/SparkTestPipelineOptions.java | 42 +++++++++++++++++++ .../SparkTestPipelineOptionsForStreaming.java | 44 ++++++++++++++++++++ .../streaming/utils/TestPipelineOptions.java | 25 ----------- .../utils/TestPipelineOptionsForStreaming.java | 44 -------------------- 15 files changed, 121 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index bc337c7..fe73aba 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.JavaSparkContext; -import org.junit.Rule; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java index c220f2b..c16574c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java @@ -28,7 +28,7 @@ import java.util.List; import java.util.Set; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; @@ -52,7 +52,7 @@ public class NamedAggregatorsTest { public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule(); @Rule - public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions(); private Pipeline createSparkPipeline() { SparkPipelineOptions options = pipelineOptions.getOptions(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index 396a30d..03f96d5 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -33,8 +33,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.values.PCollection; @@ -55,7 +54,7 @@ public class AvroPipelineTest { public final TemporaryFolder tmpDir = new TemporaryFolder(); @Rule - public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions(); @Before public void setUp() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 922046c..4e5435f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -30,9 +30,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; @@ -60,7 +59,7 @@ public class NumShardsTest { public final TemporaryFolder tmpDir = new TemporaryFolder(); @Rule - public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions(); @Before public void setUp() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index 628d8b6..b68e8d4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -22,9 +22,8 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.WritableCoder; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.values.KV; @@ -53,7 +52,7 @@ public class HadoopFileFormatPipelineTest { private File outputFile; @Rule - public final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + public final SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions(); @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index 7d39d89..3b79d03 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -18,52 +18,42 @@ package org.apache.beam.runners.spark.translation; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.hamcrest.core.Is.isA; import java.io.Serializable; -import java.net.URI; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringDelegateCoder; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; /** * Side effects test. */ public class SideEffectsTest implements Serializable { - - static class UserException extends RuntimeException { + private static class UserException extends RuntimeException { } @Rule - public transient final TestPipelineOptions pipelineOptions = new TestPipelineOptions(); + public final transient SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions(); + @Rule + public final transient ExpectedException expectedException = ExpectedException.none(); @Test public void test() throws Exception { Pipeline p = Pipeline.create(pipelineOptions.getOptions()); - p.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - p.apply(Create.of("a")).apply(ParDo.of(new OldDoFn() { - @Override + p.apply(Create.of("a")).apply(ParDo.of(new DoFn() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { throw new UserException(); } })); - try { - p.run(); - fail("Run should thrown an exception"); - } catch (RuntimeException e) { - assertTrue(e.getCause() instanceof UserException); - assertNotNull(e.getCause()); - } + expectedException.expectCause(isA(UserException.class)); + p.run(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java index 2a38e30..ec75eb7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java @@ -25,7 +25,7 @@ import java.util.Collections; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -51,7 +51,8 @@ public class EmptyStreamAssertionTest implements Serializable { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming(); + public SparkTestPipelineOptionsForStreaming commonOptions = + new SparkTestPipelineOptionsForStreaming(); @Test public void testAssertion() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index bd544e9..f69bd7f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Create; @@ -57,7 +57,8 @@ public class FlattenStreamingTest { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming(); + public SparkTestPipelineOptionsForStreaming commonOptions = + new SparkTestPipelineOptionsForStreaming(); @Test public void testFlattenUnbounded() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 5841331..6b2486b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -28,7 +28,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -71,7 +71,8 @@ public class KafkaStreamingTest { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming(); + public SparkTestPipelineOptionsForStreaming commonOptions = + new SparkTestPipelineOptionsForStreaming(); @Test public void testEarliest2Topics() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index e345831..af93d84 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -32,7 +32,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; @@ -80,7 +80,8 @@ public class ResumeFromCheckpointStreamingTest { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming(); + public SparkTestPipelineOptionsForStreaming commonOptions = + new SparkTestPipelineOptionsForStreaming(); @BeforeClass public static void init() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index bdfc24f..4c503c4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -25,7 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.MapElements; @@ -47,7 +47,8 @@ public class SimpleStreamingWordCountTest implements Serializable { public TemporaryFolder checkpointParentDir = new TemporaryFolder(); @Rule - public TestPipelineOptionsForStreaming pipelineOptions = new TestPipelineOptionsForStreaming(); + public SparkTestPipelineOptionsForStreaming pipelineOptions = + new SparkTestPipelineOptionsForStreaming(); private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptions.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptions.java new file mode 100644 index 0000000..2da9888 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptions.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming.utils; + +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.rules.ExternalResource; + +/** + * A rule to create a common {@link SparkPipelineOptions} test options for spark-runner. + */ +public class SparkTestPipelineOptions extends ExternalResource { + + protected final SparkPipelineOptions options = + PipelineOptionsFactory.as(SparkPipelineOptions.class); + + @Override + protected void before() throws Throwable { + options.setRunner(SparkRunner.class); + options.setEnableSparkMetricSinks(false); + } + + public SparkPipelineOptions getOptions() { + return options; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java new file mode 100644 index 0000000..1c0b68a --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation.streaming.utils; + +import java.io.IOException; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.junit.rules.TemporaryFolder; + + +/** + * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines. + */ +public class SparkTestPipelineOptionsForStreaming extends SparkTestPipelineOptions { + + @Override + protected void before() throws Throwable { + super.before(); + options.setTimeout(1000L); + options.setStreaming(true); + } + + public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent) + throws IOException { + // tests use JUnit's TemporaryFolder path in the form of: /.../junit/... + options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString()); + return options; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java deleted file mode 100644 index ccfb29e..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.apache.beam.runners.spark.translation.streaming.utils; - -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkRunner; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.rules.ExternalResource; - -/** - * A rule to create a common {@link SparkPipelineOptions} test options for spark-runner. - */ -public class TestPipelineOptions extends ExternalResource { - - protected final SparkPipelineOptions options = - PipelineOptionsFactory.as(SparkPipelineOptions.class); - - @Override - protected void before() throws Throwable { - options.setRunner(SparkRunner.class); - options.setEnableSparkMetricSinks(false); - } - - public SparkPipelineOptions getOptions() { - return options; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd740ee1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java deleted file mode 100644 index 3d178ae..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation.streaming.utils; - -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.junit.rules.TemporaryFolder; -import java.io.IOException; - - -/** - * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines. - */ -public class TestPipelineOptionsForStreaming extends TestPipelineOptions { - - @Override - protected void before() throws Throwable { - super.before(); - options.setTimeout(1000L); - options.setStreaming(true); - } - - public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent) - throws IOException { - // tests use JUnit's TemporaryFolder path in the form of: /.../junit/... - options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString()); - return options; - } -}