Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2203A200C4D for ; Tue, 28 Feb 2017 23:35:11 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 20EFC160B81; Tue, 28 Feb 2017 22:35:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C970F160B87 for ; Tue, 28 Feb 2017 23:35:09 +0100 (CET) Received: (qmail 44380 invoked by uid 500); 28 Feb 2017 22:35:08 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 43773 invoked by uid 99); 28 Feb 2017 22:35:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Feb 2017 22:35:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 43EFBE0AF6; Tue, 28 Feb 2017 22:35:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amitsela@apache.org To: commits@beam.apache.org Date: Tue, 28 Feb 2017 22:35:18 -0000 Message-Id: <32efddac7c74429e82d7bae831a0d216@git.apache.org> In-Reply-To: <3e2cafa6ede44a3baf79a539dd9e5e71@git.apache.org> References: <3e2cafa6ede44a3baf79a539dd9e5e71@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/23] beam git commit: Add multi stream and flattened stream tests. archived-at: Tue, 28 Feb 2017 22:35:11 -0000 Add multi stream and flattened stream tests. Misc. fixups. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24ab6053 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24ab6053 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24ab6053 Branch: refs/heads/master Commit: 24ab60538697b284b915157e08218de2e1e42f7b Parents: da5f849 Author: Sela Authored: Mon Feb 20 00:14:39 2017 +0200 Committer: Sela Committed: Wed Mar 1 00:18:02 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/TestSparkRunner.java | 8 +- .../beam/runners/spark/io/CreateStream.java | 41 +++++ .../translation/streaming/CreateStreamTest.java | 161 ++++++++++++------- 3 files changed, 146 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 24bc038..985f75d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -24,7 +24,8 @@ import static org.hamcrest.Matchers.is; import java.io.File; import java.io.IOException; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; -import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; +import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; +import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; @@ -118,8 +119,9 @@ public final class TestSparkRunner extends PipelineRunner { long timeout = sparkOptions.getForcedTimeout(); SparkPipelineResult result = null; try { - // clear state of Accumulators and Aggregators. - AccumulatorSingleton.clear(); + // clear state of Aggregators, Metrics and Watermarks. + AggregatorsAccumulator.clear(); + SparkMetricsContainer.clear(); GlobalWatermarkHolder.clear(); TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index 2149372..70784f1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -37,6 +37,47 @@ import org.joda.time.Instant; /** * Create an input stream from Queue. For SparkRunner tests only. * + *

To properly compose a stream of micro-batches with their Watermarks, please keep in mind + * that eventually there a two queues here - one for batches and another for Watermarks. + * + *

While both queues advance according to Spark's batch-interval, there is a slight difference + * in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks + * advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific + * batch it should be called before that batch. + * Also keep in mind that being a queue that is polled per batch interval, if there is a need to + * "hold" the same Watermark without advancing it it should be stated explicitly or the Watermark + * will advance as soon as it can (in the next batch completed hook). + * + *

Example 1: + * + * {@code + * CreateStream.>withBatchInterval(batchDuration) + * .nextBatch( + * TimestampedValue.of("foo", endOfGlobalWindow), + * TimestampedValue.of("bar", endOfGlobalWindow)) + * .advanceNextBatchWatermarkToInfinity(); + * } + * The first batch will see the default start-of-time WM of + * {@link BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see + * the end-of-time WM {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + *

Example 2: + * + * {@code + * CreateStream.>withBatchInterval(batchDuration) + * .nextBatch( + * TimestampedValue.of(1, instant)) + * .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20))) + * .nextBatch( + * TimestampedValue.of(2, instant)) + * .nextBatch( + * TimestampedValue.of(3, instant)) + * .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30))) + * } + * The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM. + * The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM + * regardless of where it ws called in the construction of CreateStream. + * //TODO: write a proper Builder enforcing all those rules mentioned. * @param stream type. */ public final class CreateStream extends PTransform> { http://git-wip-us.apache.org/repos/asf/beam/blob/24ab6053/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index 0cb33ab..9ee5cc5 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -68,6 +69,9 @@ import org.junit.rules.TestName; *

Since Spark is a micro-batch engine, and will process any test-sized input * within the same (first) batch, it is important to make sure inputs are ingested across * micro-batches using {@link org.apache.spark.streaming.dstream.QueueInputDStream}. + * This test suite uses {@link CreateStream} to construct such + * {@link org.apache.spark.streaming.dstream.QueueInputDStream} and advance the system's WMs. + * //TODO: add synchronized/processing time trigger. */ public class CreateStreamTest implements Serializable { @@ -161,29 +165,6 @@ public class CreateStreamTest implements Serializable { p.run(); } -// @Test -// @Category({NeedsRunner.class, UsesTestStream.class}) -// public void testProcessingTimeTrigger() { -// TestStream source = TestStream.create(VarLongCoder.of()) -// .addElements(TimestampedValue.of(1L, new Instant(1000L)), -// TimestampedValue.of(2L, new Instant(2000L))) -// .advanceProcessingTime(Duration.standardMinutes(12)) -// .addElements(TimestampedValue.of(3L, new Instant(3000L))) -// .advanceProcessingTime(Duration.standardMinutes(6)) -// .advanceWatermarkToInfinity(); -// -// PCollection sum = p.apply(source) -// .apply(Window.triggering(AfterWatermark.pastEndOfWindow() -// .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() -// .plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes() -// .withAllowedLateness(Duration.ZERO)) -// .apply(Sum.longsGlobally()); -// -// PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L); -// -// p.run(); -// } - @Test public void testDiscardingMode() throws IOException { SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); @@ -249,7 +230,7 @@ public class CreateStreamTest implements Serializable { CreateStream> source = CreateStream.>withBatchInterval(batchDuration) .nextBatch() - .advanceWatermarkForNextBatch(new Instant(-1_000_000)) + .advanceWatermarkForNextBatch(new Instant(0)) .nextBatch( TimestampedValue.of("late", lateElementTimestamp), TimestampedValue.of("onTime", new Instant(100))) @@ -268,8 +249,9 @@ public class CreateStreamTest implements Serializable { .apply(Values.>create()) .apply(Flatten.iterables()); - //TODO: empty panes do not emmit anything so Spark won't evaluate an "empty" assertion. -// PAssert.that(values).inWindow(windowFn.assignWindow(lateElementTimestamp)).empty(); + PAssert.that(values) + .inWindow(windowFn.assignWindow(lateElementTimestamp)) + .empty(); PAssert.that(values) .inWindow(windowFn.assignWindow(new Instant(100))) .containsInAnyOrder("onTime"); @@ -308,40 +290,97 @@ public class CreateStreamTest implements Serializable { p.run(); } -// @Test -// public void testMultipleStreams() throws IOException { -// SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); -// Pipeline p = Pipeline.create(options); -// options.setJobName(testName.getMethodName()); -// Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); -// -// CreateStream source = -// CreateStream.withBatchInterval(batchDuration) -// .nextBatch("foo", "bar").advanceWatermarkForNextBatch(new Instant(100)) -// .nextBatch().advanceNextBatchWatermarkToInfinity(); -// -//// CreateStream other = -//// CreateStream.withBatchInterval(batchDuration) -//// .nextBatch(1, 2, 3, 4) -//// .advanceNextBatchWatermarkToInfinity(); -// -// PCollection createStrings = -// p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of()) -// .apply("WindowStrings", -// Window.triggering(AfterPane.elementCountAtLeast(2)) -// .withAllowedLateness(Duration.ZERO) -// .accumulatingFiredPanes()); -// PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); -//// PCollection createInts = -//// p.apply("CreateInts", other).setCoder(VarIntCoder.of()) -//// .apply("WindowInts", -//// Window.triggering(AfterPane.elementCountAtLeast(4)) -//// .withAllowedLateness(Duration.ZERO) -//// .accumulatingFiredPanes()); -//// PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4); -// -// p.run(); -// } + @Test + public void testMultipleStreams() throws IOException { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + Pipeline p = Pipeline.create(options); + options.setJobName(testName.getMethodName()); + Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); + + CreateStream source = + CreateStream.withBatchInterval(batchDuration) + .nextBatch("foo", "bar") + .advanceNextBatchWatermarkToInfinity(); + CreateStream other = + CreateStream.withBatchInterval(batchDuration) + .nextBatch(1, 2, 3, 4) + .advanceNextBatchWatermarkToInfinity(); + + PCollection createStrings = + p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of()) + .apply("WindowStrings", + Window.triggering(AfterPane.elementCountAtLeast(2)) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); + + PCollection createInts = + p.apply("CreateInts", other).setCoder(VarIntCoder.of()) + .apply("WindowInts", + Window.triggering(AfterPane.elementCountAtLeast(4)) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4); + + p.run(); + } + + @Test + public void testFlattenedWithWatermarkHold() throws IOException { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + Pipeline p = Pipeline.create(options); + options.setJobName(testName.getMethodName()); + Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); + + Instant instant = new Instant(0); + CreateStream> source1 = + CreateStream.>withBatchInterval(batchDuration) + .nextBatch() + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5))) + .nextBatch( + TimestampedValue.of(1, instant), + TimestampedValue.of(2, instant), + TimestampedValue.of(3, instant)) + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10))); + CreateStream> source2 = + CreateStream.>withBatchInterval(batchDuration) + .nextBatch() + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1))) + .nextBatch( + TimestampedValue.of(4, instant)) + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(2))) + .nextBatch( + TimestampedValue.of(5, instant)) + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5))); + + PCollection windowed1 = p + .apply(source1).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of())) + .apply(ParDo.of(new OnlyValue())) + .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))) + .triggering(AfterWatermark.pastEndOfWindow()) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.ZERO)); + PCollection windowed2 = p + .apply(source2).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of())) + .apply(ParDo.of(new OnlyValue())) + .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))) + .triggering(AfterWatermark.pastEndOfWindow()) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.ZERO)); + + PCollectionList pCollectionList = PCollectionList.of(windowed1).and(windowed2); + PCollection flattened = pCollectionList.apply(Flatten.pCollections()); + PCollection triggered = flattened + .apply(WithKeys.of(1)) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(Flatten.iterables()); + + IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L))); + PAssert.that(triggered).inOnTimePane(window).containsInAnyOrder(1, 2, 3, 4, 5); + + p.run(); + } @Test public void testElementAtPositiveInfinityThrows() { @@ -378,7 +417,7 @@ public class CreateStreamTest implements Serializable { OnlyValue() { } @ProcessElement - public void onlyValue(ProcessContext c) { + public void emitTimestampedValue(ProcessContext c) { c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); } }