beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [07/23] beam git commit: Remove streaming tests that were needed before supporting the model.
Date Tue, 28 Feb 2017 22:35:14 GMT
Remove streaming tests that were needed before supporting the model.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c25a02f4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c25a02f4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c25a02f4

Branch: refs/heads/master
Commit: c25a02f459a30de5b15bb404045258e486a94266
Parents: 96d373f
Author: Sela <ansela@paypal.com>
Authored: Sat Feb 18 22:06:51 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Wed Mar 1 00:17:59 2017 +0200

----------------------------------------------------------------------
 .../streaming/EmptyStreamAssertionTest.java     |  87 ------
 .../streaming/FlattenStreamingTest.java         |  82 ------
 .../streaming/KafkaStreamingTest.java           | 270 -------------------
 .../streaming/SimpleStreamingWordCountTest.java |  84 ------
 .../utils/KafkaWriteOnBatchCompleted.java       | 105 --------
 .../streaming/utils/PAssertStreaming.java       | 121 ---------
 6 files changed, 749 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
deleted file mode 100644
index e482945..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.Collections;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule;
-import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-
-/**
- * Test that {@link PAssertStreaming} can tell if the stream is empty.
- */
-public class EmptyStreamAssertionTest implements Serializable {
-
-  private static final String EXPECTED_ERR =
-      "Success aggregator should be greater than zero.\n"
-          + "Expected: not <0>\n"
-          + "     but: was <0>";
-
-  @Rule
-  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
-
-  @Rule
-  public SparkTestPipelineOptionsForStreaming commonOptions =
-      new SparkTestPipelineOptionsForStreaming();
-
-  @Rule
-  public ClearAggregatorsRule clearAggregatorsRule = new ClearAggregatorsRule();
-
-  @Test
-  public void testAssertion() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
-    options.setStreaming(true);
-
-    Duration windowDuration = new Duration(options.getBatchIntervalMillis());
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    PCollection<String> output =
-        pipeline
-            .apply(CreateStream.fromQueue(Collections.<Iterable<String>>emptyList()))
-            .setCoder(StringUtf8Coder.of())
-            .apply(Window.<String>into(FixedWindows.of(windowDuration)));
-
-    try {
-      PAssertStreaming.runAndAssertContents(pipeline, output, new String[0],
-          Duration.standardSeconds(1L));
-    } catch (AssertionError e) {
-      assertTrue("Expected error message: " + EXPECTED_ERR + " but got: " + e.getMessage(),
-          e.getMessage().equals(EXPECTED_ERR));
-      return;
-    }
-    fail("assertion should have failed");
-    throw new RuntimeException("unreachable");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
deleted file mode 100644
index fc40bbd..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Test Flatten (union) implementation for streaming.
- */
-public class FlattenStreamingTest {
-
-  private static final String[] WORDS_ARRAY_1 = {
-      "one", "two", "three", "four"};
-  private static final List<Iterable<String>> WORDS_QUEUE_1 =
-      Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_1));
-  private static final String[] WORDS_ARRAY_2 = {
-          "five", "six", "seven", "eight"};
-  private static final List<Iterable<String>> WORDS_QUEUE_2 =
-          Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2));
-  private static final String[] EXPECTED_UNION = {
-          "one", "two", "three", "four", "five", "six", "seven", "eight"};
-
-  @Rule
-  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
-
-  @Rule
-  public SparkTestPipelineOptionsForStreaming commonOptions =
-      new SparkTestPipelineOptionsForStreaming();
-
-  @Test
-  public void testFlattenUnbounded() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
-    options.setStreaming(true);
-
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> w1 =
-        p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedW1 =
-        w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollection<String> w2 =
-        p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedW2 =
-        w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
-    PCollection<String> union = list.apply(Flatten.<String>pCollections());
-
-    PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
deleted file mode 100644
index 404cb5d..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectStreamException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
-import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted;
-import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.transforms.Distinct;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.spark.streaming.api.java.JavaStreamingListener;
-import org.joda.time.Duration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-
-/**
- * Test Kafka as input.
- */
-public class KafkaStreamingTest {
-  private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
-      new EmbeddedKafkaCluster.EmbeddedZookeeper();
-  private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
-      new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection());
-
-  @BeforeClass
-  public static void init() throws IOException {
-    EMBEDDED_ZOOKEEPER.startup();
-    EMBEDDED_KAFKA_CLUSTER.startup();
-  }
-
-  @Rule
-  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
-
-  @Rule
-  public SparkTestPipelineOptionsForStreaming commonOptions =
-      new SparkTestPipelineOptionsForStreaming();
-
-  @Test
-  public void testEarliest2Topics() throws Exception {
-    Duration batchIntervalDuration = Duration.standardSeconds(5);
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
-    // provide a generous enough batch-interval to have everything fit in one micro-batch.
-    options.setBatchIntervalMillis(batchIntervalDuration.getMillis());
-    // provide a very generous read time bound, we rely on num records bound here.
-    options.setMinReadTimeMillis(batchIntervalDuration.minus(1).getMillis());
-    // bound the read on the number of messages - 2 topics of 4 messages each.
-    options.setMaxRecordsPerBatch(8L);
-
-    //--- setup
-    // two topics.
-    final String topic1 = "topic1";
-    final String topic2 = "topic2";
-    // messages.
-    final Map<String, String> messages = ImmutableMap.of(
-        "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
-    );
-    // expected.
-    final String[] expected = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
-
-    // write to both topics ahead.
-    produce(topic1, messages);
-    produce(topic2, messages);
-
-    //------- test: read and dedup.
-    Pipeline p = Pipeline.create(options);
-
-    Map<String, Object> consumerProps = ImmutableMap.<String, Object>of(
-        "auto.offset.reset", "earliest"
-    );
-
-    KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
-        .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
-        .withTopics(Arrays.asList(topic1, topic2))
-        .withKeyCoder(StringUtf8Coder.of())
-        .withValueCoder(StringUtf8Coder.of())
-        .updateConsumerProperties(consumerProps);
-
-    PCollection<String> deduped =
-        p.apply(read.withoutMetadata()).setCoder(
-            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
-        .apply(Window.<KV<String, String>>into(FixedWindows.of(batchIntervalDuration)))
-        .apply(ParDo.of(new FormatKVFn()))
-        .apply(Distinct.<String>create());
-
-    // graceful shutdown will make sure first batch (at least) will finish.
-    Duration timeout = Duration.standardSeconds(1L);
-    PAssertStreaming.runAndAssertContents(p, deduped, expected, timeout);
-  }
-
-  @Test
-  public void testLatest() throws Exception {
-    Duration batchIntervalDuration = Duration.standardSeconds(5);
-    SparkContextOptions options =
-        commonOptions.withTmpCheckpointDir(checkpointParentDir).as(SparkContextOptions.class);
-    // provide a generous enough batch-interval to have everything fit in one micro-batch.
-    options.setBatchIntervalMillis(batchIntervalDuration.getMillis());
-    // provide a very generous read time bound, we rely on num records bound here.
-    options.setMinReadTimeMillis(batchIntervalDuration.minus(1).getMillis());
-    // bound the read on the number of messages - 1 topics of 4 messages.
-    options.setMaxRecordsPerBatch(4L);
-
-    //--- setup
-    final String topic = "topic";
-    // messages.
-    final Map<String, String> messages = ImmutableMap.of(
-        "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
-    );
-    // expected.
-    final String[] expected = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
-
-    // write once first batch completes, this will guarantee latest-like behaviour.
-    options.setListeners(Collections.<JavaStreamingListener>singletonList(
-        KafkaWriteOnBatchCompleted.once(messages, Collections.singletonList(topic),
-            EMBEDDED_KAFKA_CLUSTER.getProps(), EMBEDDED_KAFKA_CLUSTER.getBrokerList())));
-
-    //------- test: read and format.
-    Pipeline p = Pipeline.create(options);
-
-    Map<String, Object> consumerProps = ImmutableMap.<String, Object>of(
-        "auto.offset.reset", "latest"
-    );
-
-    KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
-        .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
-        .withTopics(Collections.singletonList(topic))
-        .withKeyCoder(StringUtf8Coder.of())
-        .withValueCoder(NonKryoSerializableStringCoder.of())
-        .updateConsumerProperties(consumerProps);
-
-    PCollection<String> formatted =
-        p.apply(read.withoutMetadata()).setCoder(
-            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
-        .apply(Window.<KV<String, String>>into(FixedWindows.of(batchIntervalDuration)))
-        .apply(ParDo.of(new FormatKVFn()));
-
-    // run for more than 1 batch interval, so that reading of latest is attempted in the
-    // first batch with no luck, while the OnBatchCompleted injected-input afterwards will
be read
-    // in the second interval.
-    PAssertStreaming.runAndAssertContents(p, formatted, expected, Duration.standardSeconds(3));
-  }
-
-  private static void produce(String topic, Map<String, String> messages) {
-    Serializer<String> stringSerializer = new StringSerializer();
-    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer
=
-        new KafkaProducer(defaultProducerProps(), stringSerializer, stringSerializer)) {
-          // feed topic.
-          for (Map.Entry<String, String> en : messages.entrySet()) {
-            kafkaProducer.send(new ProducerRecord<>(topic, en.getKey(), en.getValue()));
-          }
-          // await send completion.
-          kafkaProducer.flush();
-        }
-  }
-
-  private static Properties defaultProducerProps() {
-    Properties producerProps = new Properties();
-    producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
-    producerProps.put("acks", "1");
-    producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
-    return producerProps;
-  }
-
-  @AfterClass
-  public static void tearDown() {
-    EMBEDDED_KAFKA_CLUSTER.shutdown();
-    EMBEDDED_ZOOKEEPER.shutdown();
-  }
-
-  private static class FormatKVFn extends DoFn<KV<String, String>, String> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element().getKey() + "," + c.element().getValue());
-    }
-  }
-
-  /**
-   * This coder is not Kryo serializable, used to make sure
-   * {@link org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator} registers needed
-   * classes to ensure Java serialization is used instead.
-   */
-  private static class NonKryoSerializableStringCoder extends CustomCoder<String>
-      implements Serializable {
-    private Coder<String> stringCoder;
-    private Boolean isSerialized = false;
-
-    private NonKryoSerializableStringCoder() {
-    }
-
-    @JsonCreator
-    public static NonKryoSerializableStringCoder of() {
-      return new NonKryoSerializableStringCoder();
-    }
-
-    private Object readResolve() throws ObjectStreamException {
-      NonKryoSerializableStringCoder deserialized = new NonKryoSerializableStringCoder();
-      deserialized.stringCoder = StringUtf8Coder.of();
-      deserialized.isSerialized = true;
-      return deserialized;
-    }
-
-    private Object writeReplace() throws ObjectStreamException {
-      return new NonKryoSerializableStringCoder();
-    }
-
-    @Override
-    public void encode(String value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      if (!isSerialized) {
-        this.stringCoder = StringUtf8Coder.of();
-      }
-      stringCoder.encode(value, outStream, context);
-    }
-
-    @Override
-    public String decode(InputStream inStream, Context context) throws CoderException, IOException
{
-      if (!isSerialized) {
-        this.stringCoder = StringUtf8Coder.of();
-      }
-      return stringCoder.decode(inStream, context);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
deleted file mode 100644
index 3734cf6..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming;
-
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-
-/**
- * Simple word count streaming test.
- */
-public class SimpleStreamingWordCountTest implements Serializable {
-
-  @Rule
-  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
-
-  @Rule
-  public SparkTestPipelineOptionsForStreaming pipelineOptions =
-      new SparkTestPipelineOptionsForStreaming();
-
-  private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob
hi"};
-
-  private static final List<Iterable<String>> MANY_WORDS =
-      Lists.<Iterable<String>>newArrayList(Arrays.asList(WORDS), Arrays.asList(WORDS));
-
-  private static final String[] EXPECTED_WORD_COUNTS = {"hi: 10", "there: 2", "sue: 4", "bob:
4"};
-
-  private static final Duration BATCH_INTERVAL = Duration.standardSeconds(1);
-
-  private static final Duration windowDuration = BATCH_INTERVAL.multipliedBy(2);
-
-  @Test
-  public void testFixedWindows() throws Exception {
-    SparkPipelineOptions options = pipelineOptions.withTmpCheckpointDir(checkpointParentDir);
-    options.setStreaming(true);
-
-    // override defaults
-    options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    PCollection<String> output =
-        pipeline
-            .apply(CreateStream.fromQueue(MANY_WORDS))
-            .setCoder(StringUtf8Coder.of())
-            .apply(Window.<String>into(FixedWindows.of(windowDuration)))
-            .apply(new WordCount.CountWords())
-            .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS, windowDuration);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java
deleted file mode 100644
index 38a5bff..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming.utils;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.spark.streaming.api.java.JavaStreamingListener;
-import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Write to Kafka once the OnBatchCompleted hook is activated.
- */
-public class KafkaWriteOnBatchCompleted extends JavaStreamingListener{
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaWriteOnBatchCompleted.class);
-
-  private final Map<String, String> messages;
-  private final List<String> topics;
-  private final Properties producerProps;
-  private final String brokerList;
-  private final boolean once;
-
-  // A flag to state that no more writes should happen.
-  private boolean done = false;
-
-  private KafkaWriteOnBatchCompleted(Map<String, String> messages,
-                                     List<String> topics,
-                                     Properties producerProps,
-                                     String brokerList,
-                                     boolean once) {
-    this.messages = messages;
-    this.topics = topics;
-    this.producerProps = producerProps;
-    this.brokerList = brokerList;
-    this.once = once;
-  }
-
-  public static KafkaWriteOnBatchCompleted once(Map<String, String> messages,
-                                                List<String> topics,
-                                                Properties producerProps,
-                                                String brokerList) {
-    return new KafkaWriteOnBatchCompleted(messages, topics, producerProps, brokerList, true);
-  }
-
-  public static KafkaWriteOnBatchCompleted always(Map<String, String> messages,
-                                                  List<String> topics,
-                                                  Properties producerProps,
-                                                  String brokerList) {
-    return new KafkaWriteOnBatchCompleted(messages, topics, producerProps, brokerList, false);
-  }
-
-  @Override
-  public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
-    super.onBatchCompleted(batchCompleted);
-    if (!done) {
-      LOG.info("Writing to Kafka after batchTime {} has completed.",
-          batchCompleted.batchInfo().batchTime());
-      write();
-      // once runs once.
-      if (once) {
-        done = true;
-      }
-    }
-  }
-
-  private void write() {
-    Properties props = new Properties();
-    props.putAll(producerProps);
-    props.put("acks", "1");
-    props.put("bootstrap.servers", brokerList);
-    Serializer<String> stringSerializer = new StringSerializer();
-    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer
=
-        new KafkaProducer(props, stringSerializer, stringSerializer)) {
-          for (String topic: topics) {
-            for (Map.Entry<String, String> en : messages.entrySet()) {
-              kafkaProducer.send(new ProducerRecord<>(topic, en.getKey(), en.getValue()));
-            }
-            // await send completion.
-            kafkaProducer.flush();
-          }
-        }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c25a02f4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
deleted file mode 100644
index cd9de92..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming.utils;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import java.io.Serializable;
-import org.apache.beam.runners.spark.SparkPipelineResult;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Since PAssert doesn't propagate assert exceptions, use Aggregators to assert streaming
- * success/failure counters.
- */
-public final class PAssertStreaming implements Serializable {
-  private static final Logger LOG = LoggerFactory.getLogger(PAssertStreaming.class);
-
-  private PAssertStreaming() {
-  }
-
-  /**
-   * Adds a pipeline run-time assertion that the contents of {@code actual} are {@code expected}.
-   * Note that it is oblivious to windowing, so the assertion will apply indiscriminately
to all
-   * windows.
-   */
-  public static <T> SparkPipelineResult runAndAssertContents(
-      Pipeline p,
-      PCollection<T> actual,
-      T[] expected,
-      Duration timeout,
-      boolean stopGracefully) {
-    // Because PAssert does not support non-global windowing, but all our data is in one
window,
-    // we set up the assertion directly.
-    actual
-        .apply(WithKeys.<String, T>of("dummy"))
-        .apply(GroupByKey.<String, T>create())
-        .apply(Values.<Iterable<T>>create())
-        .apply(ParDo.of(new AssertDoFn<>(expected)));
-
-    // run the pipeline.
-    SparkPipelineResult res = (SparkPipelineResult) p.run();
-    res.waitUntilFinish(timeout);
-    // validate assertion succeeded (at least once).
-    int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
-    Assert.assertThat("Success aggregator should be greater than zero.", success, not(0));
-    // validate assertion didn't fail.
-    int failure = res.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class);
-    Assert.assertThat("Failure aggregator should be zero.", failure, is(0));
-
-    LOG.info("PAssertStreaming had {} successful assertion and {} failed.", success, failure);
-    return res;
-  }
-
-  /**
-   * Default to stop gracefully so that tests will finish processing even if slower for reasons
-   * such as a slow runtime environment.
-   */
-  public static <T> SparkPipelineResult runAndAssertContents(
-      Pipeline p,
-      PCollection<T> actual,
-      T[] expected,
-      Duration timeout) {
-    return runAndAssertContents(p, actual, expected, timeout, true);
-  }
-
-  private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
-    private final Aggregator<Integer, Integer> success =
-        createAggregator(PAssert.SUCCESS_COUNTER, Sum.ofIntegers());
-    private final Aggregator<Integer, Integer> failure =
-        createAggregator(PAssert.FAILURE_COUNTER, Sum.ofIntegers());
-    private final T[] expected;
-
-    AssertDoFn(T[] expected) {
-      this.expected = expected;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      try {
-        assertThat(c.element(), containsInAnyOrder(expected));
-        success.addValue(1);
-      } catch (Throwable t) {
-        failure.addValue(1);
-        LOG.error("PAssert failed expectations.", t);
-        // don't throw t because it will fail this bundle and the failure count will be lost.
-      }
-    }
-  }
-}


Mime
View raw message