Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4CD6D200C38 for ; Tue, 28 Feb 2017 23:35:11 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4BA97160B59; Tue, 28 Feb 2017 22:35:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6CB0E160B82 for ; Tue, 28 Feb 2017 23:35:09 +0100 (CET) Received: (qmail 43796 invoked by uid 500); 28 Feb 2017 22:35:08 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 43595 invoked by uid 99); 28 Feb 2017 22:35:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Feb 2017 22:35:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F63CE005E; Tue, 28 Feb 2017 22:35:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amitsela@apache.org To: commits@beam.apache.org Date: Tue, 28 Feb 2017 22:35:14 -0000 Message-Id: <9797756996f44c799711ea3c979eb231@git.apache.org> In-Reply-To: <3e2cafa6ede44a3baf79a539dd9e5e71@git.apache.org> References: <3e2cafa6ede44a3baf79a539dd9e5e71@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/23] beam git commit: Remove streaming tests that were needed before supporting the model. archived-at: Tue, 28 Feb 2017 22:35:11 -0000 Remove streaming tests that were needed before supporting the model. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c25a02f4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c25a02f4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c25a02f4 Branch: refs/heads/master Commit: c25a02f459a30de5b15bb404045258e486a94266 Parents: 96d373f Author: Sela Authored: Sat Feb 18 22:06:51 2017 +0200 Committer: Sela Committed: Wed Mar 1 00:17:59 2017 +0200 ---------------------------------------------------------------------- .../streaming/EmptyStreamAssertionTest.java | 87 ------ .../streaming/FlattenStreamingTest.java | 82 ------ .../streaming/KafkaStreamingTest.java | 270 ------------------- .../streaming/SimpleStreamingWordCountTest.java | 84 ------ .../utils/KafkaWriteOnBatchCompleted.java | 105 -------- .../streaming/utils/PAssertStreaming.java | 121 --------- 6 files changed, 749 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java deleted file mode 100644 index e482945..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.Serializable; -import java.util.Collections; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule; -import org.apache.beam.runners.spark.io.CreateStream; -import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - - -/** - * Test that {@link PAssertStreaming} can tell if the stream is empty. - */ -public class EmptyStreamAssertionTest implements Serializable { - - private static final String EXPECTED_ERR = - "Success aggregator should be greater than zero.\n" - + "Expected: not <0>\n" - + " but: was <0>"; - - @Rule - public TemporaryFolder checkpointParentDir = new TemporaryFolder(); - - @Rule - public SparkTestPipelineOptionsForStreaming commonOptions = - new SparkTestPipelineOptionsForStreaming(); - - @Rule - public ClearAggregatorsRule clearAggregatorsRule = new ClearAggregatorsRule(); - - @Test - public void testAssertion() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); - options.setStreaming(true); - - Duration windowDuration = new Duration(options.getBatchIntervalMillis()); - - Pipeline pipeline = Pipeline.create(options); - - PCollection output = - pipeline - .apply(CreateStream.fromQueue(Collections.>emptyList())) - .setCoder(StringUtf8Coder.of()) - .apply(Window.into(FixedWindows.of(windowDuration))); - - try { - PAssertStreaming.runAndAssertContents(pipeline, output, new String[0], - Duration.standardSeconds(1L)); - } catch (AssertionError e) { - assertTrue("Expected error message: " + EXPECTED_ERR + " but got: " + e.getMessage(), - e.getMessage().equals(EXPECTED_ERR)); - return; - } - fail("assertion should have failed"); - throw new RuntimeException("unreachable"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java deleted file mode 100644 index fc40bbd..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.io.CreateStream; -import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.joda.time.Duration; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -/** - * Test Flatten (union) implementation for streaming. - */ -public class FlattenStreamingTest { - - private static final String[] WORDS_ARRAY_1 = { - "one", "two", "three", "four"}; - private static final List> WORDS_QUEUE_1 = - Collections.>singletonList(Arrays.asList(WORDS_ARRAY_1)); - private static final String[] WORDS_ARRAY_2 = { - "five", "six", "seven", "eight"}; - private static final List> WORDS_QUEUE_2 = - Collections.>singletonList(Arrays.asList(WORDS_ARRAY_2)); - private static final String[] EXPECTED_UNION = { - "one", "two", "three", "four", "five", "six", "seven", "eight"}; - - @Rule - public TemporaryFolder checkpointParentDir = new TemporaryFolder(); - - @Rule - public SparkTestPipelineOptionsForStreaming commonOptions = - new SparkTestPipelineOptionsForStreaming(); - - @Test - public void testFlattenUnbounded() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); - options.setStreaming(true); - - Pipeline p = Pipeline.create(options); - PCollection w1 = - p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of()); - PCollection windowedW1 = - w1.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); - PCollection w2 = - p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of()); - PCollection windowedW2 = - w2.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); - PCollectionList list = PCollectionList.of(windowedW1).and(windowedW2); - PCollection union = list.apply(Flatten.pCollections()); - - PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L)); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java deleted file mode 100644 index 404cb5d..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.google.common.collect.ImmutableMap; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectStreamException; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.Properties; -import org.apache.beam.runners.spark.SparkContextOptions; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; -import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted; -import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.transforms.Distinct; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.spark.streaming.api.java.JavaStreamingListener; -import org.joda.time.Duration; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - - -/** - * Test Kafka as input. - */ -public class KafkaStreamingTest { - private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER = - new EmbeddedKafkaCluster.EmbeddedZookeeper(); - private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER = - new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection()); - - @BeforeClass - public static void init() throws IOException { - EMBEDDED_ZOOKEEPER.startup(); - EMBEDDED_KAFKA_CLUSTER.startup(); - } - - @Rule - public TemporaryFolder checkpointParentDir = new TemporaryFolder(); - - @Rule - public SparkTestPipelineOptionsForStreaming commonOptions = - new SparkTestPipelineOptionsForStreaming(); - - @Test - public void testEarliest2Topics() throws Exception { - Duration batchIntervalDuration = Duration.standardSeconds(5); - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); - // provide a generous enough batch-interval to have everything fit in one micro-batch. - options.setBatchIntervalMillis(batchIntervalDuration.getMillis()); - // provide a very generous read time bound, we rely on num records bound here. - options.setMinReadTimeMillis(batchIntervalDuration.minus(1).getMillis()); - // bound the read on the number of messages - 2 topics of 4 messages each. - options.setMaxRecordsPerBatch(8L); - - //--- setup - // two topics. - final String topic1 = "topic1"; - final String topic2 = "topic2"; - // messages. - final Map messages = ImmutableMap.of( - "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4" - ); - // expected. - final String[] expected = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"}; - - // write to both topics ahead. - produce(topic1, messages); - produce(topic2, messages); - - //------- test: read and dedup. - Pipeline p = Pipeline.create(options); - - Map consumerProps = ImmutableMap.of( - "auto.offset.reset", "earliest" - ); - - KafkaIO.Read read = KafkaIO.read() - .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) - .withTopics(Arrays.asList(topic1, topic2)) - .withKeyCoder(StringUtf8Coder.of()) - .withValueCoder(StringUtf8Coder.of()) - .updateConsumerProperties(consumerProps); - - PCollection deduped = - p.apply(read.withoutMetadata()).setCoder( - KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) - .apply(Window.>into(FixedWindows.of(batchIntervalDuration))) - .apply(ParDo.of(new FormatKVFn())) - .apply(Distinct.create()); - - // graceful shutdown will make sure first batch (at least) will finish. - Duration timeout = Duration.standardSeconds(1L); - PAssertStreaming.runAndAssertContents(p, deduped, expected, timeout); - } - - @Test - public void testLatest() throws Exception { - Duration batchIntervalDuration = Duration.standardSeconds(5); - SparkContextOptions options = - commonOptions.withTmpCheckpointDir(checkpointParentDir).as(SparkContextOptions.class); - // provide a generous enough batch-interval to have everything fit in one micro-batch. - options.setBatchIntervalMillis(batchIntervalDuration.getMillis()); - // provide a very generous read time bound, we rely on num records bound here. - options.setMinReadTimeMillis(batchIntervalDuration.minus(1).getMillis()); - // bound the read on the number of messages - 1 topics of 4 messages. - options.setMaxRecordsPerBatch(4L); - - //--- setup - final String topic = "topic"; - // messages. - final Map messages = ImmutableMap.of( - "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4" - ); - // expected. - final String[] expected = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"}; - - // write once first batch completes, this will guarantee latest-like behaviour. - options.setListeners(Collections.singletonList( - KafkaWriteOnBatchCompleted.once(messages, Collections.singletonList(topic), - EMBEDDED_KAFKA_CLUSTER.getProps(), EMBEDDED_KAFKA_CLUSTER.getBrokerList()))); - - //------- test: read and format. - Pipeline p = Pipeline.create(options); - - Map consumerProps = ImmutableMap.of( - "auto.offset.reset", "latest" - ); - - KafkaIO.Read read = KafkaIO.read() - .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) - .withTopics(Collections.singletonList(topic)) - .withKeyCoder(StringUtf8Coder.of()) - .withValueCoder(NonKryoSerializableStringCoder.of()) - .updateConsumerProperties(consumerProps); - - PCollection formatted = - p.apply(read.withoutMetadata()).setCoder( - KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) - .apply(Window.>into(FixedWindows.of(batchIntervalDuration))) - .apply(ParDo.of(new FormatKVFn())); - - // run for more than 1 batch interval, so that reading of latest is attempted in the - // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read - // in the second interval. - PAssertStreaming.runAndAssertContents(p, formatted, expected, Duration.standardSeconds(3)); - } - - private static void produce(String topic, Map messages) { - Serializer stringSerializer = new StringSerializer(); - try (@SuppressWarnings("unchecked") KafkaProducer kafkaProducer = - new KafkaProducer(defaultProducerProps(), stringSerializer, stringSerializer)) { - // feed topic. - for (Map.Entry en : messages.entrySet()) { - kafkaProducer.send(new ProducerRecord<>(topic, en.getKey(), en.getValue())); - } - // await send completion. - kafkaProducer.flush(); - } - } - - private static Properties defaultProducerProps() { - Properties producerProps = new Properties(); - producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps()); - producerProps.put("acks", "1"); - producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList()); - return producerProps; - } - - @AfterClass - public static void tearDown() { - EMBEDDED_KAFKA_CLUSTER.shutdown(); - EMBEDDED_ZOOKEEPER.shutdown(); - } - - private static class FormatKVFn extends DoFn, String> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().getKey() + "," + c.element().getValue()); - } - } - - /** - * This coder is not Kryo serializable, used to make sure - * {@link org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator} registers needed - * classes to ensure Java serialization is used instead. - */ - private static class NonKryoSerializableStringCoder extends CustomCoder - implements Serializable { - private Coder stringCoder; - private Boolean isSerialized = false; - - private NonKryoSerializableStringCoder() { - } - - @JsonCreator - public static NonKryoSerializableStringCoder of() { - return new NonKryoSerializableStringCoder(); - } - - private Object readResolve() throws ObjectStreamException { - NonKryoSerializableStringCoder deserialized = new NonKryoSerializableStringCoder(); - deserialized.stringCoder = StringUtf8Coder.of(); - deserialized.isSerialized = true; - return deserialized; - } - - private Object writeReplace() throws ObjectStreamException { - return new NonKryoSerializableStringCoder(); - } - - @Override - public void encode(String value, OutputStream outStream, Context context) - throws CoderException, IOException { - if (!isSerialized) { - this.stringCoder = StringUtf8Coder.of(); - } - stringCoder.encode(value, outStream, context); - } - - @Override - public String decode(InputStream inStream, Context context) throws CoderException, IOException { - if (!isSerialized) { - this.stringCoder = StringUtf8Coder.of(); - } - return stringCoder.decode(inStream, context); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java deleted file mode 100644 index 3734cf6..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - -import com.google.common.collect.Lists; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.examples.WordCount; -import org.apache.beam.runners.spark.io.CreateStream; -import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - - -/** - * Simple word count streaming test. - */ -public class SimpleStreamingWordCountTest implements Serializable { - - @Rule - public TemporaryFolder checkpointParentDir = new TemporaryFolder(); - - @Rule - public SparkTestPipelineOptionsForStreaming pipelineOptions = - new SparkTestPipelineOptionsForStreaming(); - - private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; - - private static final List> MANY_WORDS = - Lists.>newArrayList(Arrays.asList(WORDS), Arrays.asList(WORDS)); - - private static final String[] EXPECTED_WORD_COUNTS = {"hi: 10", "there: 2", "sue: 4", "bob: 4"}; - - private static final Duration BATCH_INTERVAL = Duration.standardSeconds(1); - - private static final Duration windowDuration = BATCH_INTERVAL.multipliedBy(2); - - @Test - public void testFixedWindows() throws Exception { - SparkPipelineOptions options = pipelineOptions.withTmpCheckpointDir(checkpointParentDir); - options.setStreaming(true); - - // override defaults - options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis()); - - Pipeline pipeline = Pipeline.create(options); - - PCollection output = - pipeline - .apply(CreateStream.fromQueue(MANY_WORDS)) - .setCoder(StringUtf8Coder.of()) - .apply(Window.into(FixedWindows.of(windowDuration))) - .apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())); - - PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS, windowDuration); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java deleted file mode 100644 index 38a5bff..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming.utils; - -import java.util.List; -import java.util.Map; -import java.util.Properties; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.spark.streaming.api.java.JavaStreamingListener; -import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Write to Kafka once the OnBatchCompleted hook is activated. - */ -public class KafkaWriteOnBatchCompleted extends JavaStreamingListener{ - private static final Logger LOG = LoggerFactory.getLogger(KafkaWriteOnBatchCompleted.class); - - private final Map messages; - private final List topics; - private final Properties producerProps; - private final String brokerList; - private final boolean once; - - // A flag to state that no more writes should happen. - private boolean done = false; - - private KafkaWriteOnBatchCompleted(Map messages, - List topics, - Properties producerProps, - String brokerList, - boolean once) { - this.messages = messages; - this.topics = topics; - this.producerProps = producerProps; - this.brokerList = brokerList; - this.once = once; - } - - public static KafkaWriteOnBatchCompleted once(Map messages, - List topics, - Properties producerProps, - String brokerList) { - return new KafkaWriteOnBatchCompleted(messages, topics, producerProps, brokerList, true); - } - - public static KafkaWriteOnBatchCompleted always(Map messages, - List topics, - Properties producerProps, - String brokerList) { - return new KafkaWriteOnBatchCompleted(messages, topics, producerProps, brokerList, false); - } - - @Override - public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { - super.onBatchCompleted(batchCompleted); - if (!done) { - LOG.info("Writing to Kafka after batchTime {} has completed.", - batchCompleted.batchInfo().batchTime()); - write(); - // once runs once. - if (once) { - done = true; - } - } - } - - private void write() { - Properties props = new Properties(); - props.putAll(producerProps); - props.put("acks", "1"); - props.put("bootstrap.servers", brokerList); - Serializer stringSerializer = new StringSerializer(); - try (@SuppressWarnings("unchecked") KafkaProducer kafkaProducer = - new KafkaProducer(props, stringSerializer, stringSerializer)) { - for (String topic: topics) { - for (Map.Entry en : messages.entrySet()) { - kafkaProducer.send(new ProducerRecord<>(topic, en.getKey(), en.getValue())); - } - // await send completion. - kafkaProducer.flush(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java deleted file mode 100644 index cd9de92..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming.utils; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; - -import java.io.Serializable; -import org.apache.beam.runners.spark.SparkPipelineResult; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Since PAssert doesn't propagate assert exceptions, use Aggregators to assert streaming - * success/failure counters. - */ -public final class PAssertStreaming implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(PAssertStreaming.class); - - private PAssertStreaming() { - } - - /** - * Adds a pipeline run-time assertion that the contents of {@code actual} are {@code expected}. - * Note that it is oblivious to windowing, so the assertion will apply indiscriminately to all - * windows. - */ - public static SparkPipelineResult runAndAssertContents( - Pipeline p, - PCollection actual, - T[] expected, - Duration timeout, - boolean stopGracefully) { - // Because PAssert does not support non-global windowing, but all our data is in one window, - // we set up the assertion directly. - actual - .apply(WithKeys.of("dummy")) - .apply(GroupByKey.create()) - .apply(Values.>create()) - .apply(ParDo.of(new AssertDoFn<>(expected))); - - // run the pipeline. - SparkPipelineResult res = (SparkPipelineResult) p.run(); - res.waitUntilFinish(timeout); - // validate assertion succeeded (at least once). - int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); - Assert.assertThat("Success aggregator should be greater than zero.", success, not(0)); - // validate assertion didn't fail. - int failure = res.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class); - Assert.assertThat("Failure aggregator should be zero.", failure, is(0)); - - LOG.info("PAssertStreaming had {} successful assertion and {} failed.", success, failure); - return res; - } - - /** - * Default to stop gracefully so that tests will finish processing even if slower for reasons - * such as a slow runtime environment. - */ - public static SparkPipelineResult runAndAssertContents( - Pipeline p, - PCollection actual, - T[] expected, - Duration timeout) { - return runAndAssertContents(p, actual, expected, timeout, true); - } - - private static class AssertDoFn extends DoFn, Void> { - private final Aggregator success = - createAggregator(PAssert.SUCCESS_COUNTER, Sum.ofIntegers()); - private final Aggregator failure = - createAggregator(PAssert.FAILURE_COUNTER, Sum.ofIntegers()); - private final T[] expected; - - AssertDoFn(T[] expected) { - this.expected = expected; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - try { - assertThat(c.element(), containsInAnyOrder(expected)); - success.addValue(1); - } catch (Throwable t) { - failure.addValue(1); - LOG.error("PAssert failed expectations.", t); - // don't throw t because it will fail this bundle and the failure count will be lost. - } - } - } -}